xref: /spdk/lib/bdev/bdev.c (revision afaabcce2388835082b8653b595898f9ca8c3c24)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/bdev.h"
37 #include "spdk/conf.h"
38 
39 #include "spdk/env.h"
40 #include "spdk/event.h"
41 #include "spdk/thread.h"
42 #include "spdk/likely.h"
43 #include "spdk/queue.h"
44 #include "spdk/nvme_spec.h"
45 #include "spdk/scsi_spec.h"
46 #include "spdk/util.h"
47 #include "spdk/trace.h"
48 
49 #include "spdk/bdev_module.h"
50 #include "spdk_internal/log.h"
51 #include "spdk/string.h"
52 
53 #ifdef SPDK_CONFIG_VTUNE
54 #include "ittnotify.h"
55 #include "ittnotify_types.h"
56 int __itt_init_ittlib(const char *, __itt_group_id);
57 #endif
58 
59 #define SPDK_BDEV_IO_POOL_SIZE			(64 * 1024)
60 #define SPDK_BDEV_IO_CACHE_SIZE			256
61 #define BUF_SMALL_POOL_SIZE			8192
62 #define BUF_LARGE_POOL_SIZE			1024
63 #define NOMEM_THRESHOLD_COUNT			8
64 #define ZERO_BUFFER_SIZE			0x100000
65 
66 #define OWNER_BDEV		0x2
67 
68 #define OBJECT_BDEV_IO		0x2
69 
70 #define TRACE_GROUP_BDEV	0x3
71 #define TRACE_BDEV_IO_START	SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x0)
72 #define TRACE_BDEV_IO_DONE	SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x1)
73 
74 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC		1000
75 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE	1
76 #define SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE	512
77 #define SPDK_BDEV_QOS_MIN_IOS_PER_SEC		10000
78 #define SPDK_BDEV_QOS_MIN_BW_IN_MB_PER_SEC	10
79 
80 enum spdk_bdev_qos_type {
81 	SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT = 0,
82 	SPDK_BDEV_QOS_RW_BYTEPS_RATE_LIMIT,
83 	SPDK_BDEV_QOS_NUM_TYPES /* Keep last */
84 };
85 
86 static const char *qos_type_str[SPDK_BDEV_QOS_NUM_TYPES] = {"Limit_IOPS", "Limit_BWPS"};
87 
88 TAILQ_HEAD(spdk_bdev_list, spdk_bdev);
89 
90 struct spdk_bdev_mgr {
91 	struct spdk_mempool *bdev_io_pool;
92 
93 	struct spdk_mempool *buf_small_pool;
94 	struct spdk_mempool *buf_large_pool;
95 
96 	void *zero_buffer;
97 
98 	TAILQ_HEAD(, spdk_bdev_module) bdev_modules;
99 
100 	struct spdk_bdev_list bdevs;
101 
102 	bool init_complete;
103 	bool module_init_complete;
104 
105 #ifdef SPDK_CONFIG_VTUNE
106 	__itt_domain	*domain;
107 #endif
108 };
109 
110 static struct spdk_bdev_mgr g_bdev_mgr = {
111 	.bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules),
112 	.bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs),
113 	.init_complete = false,
114 	.module_init_complete = false,
115 };
116 
117 static struct spdk_bdev_opts	g_bdev_opts = {
118 	.bdev_io_pool_size = SPDK_BDEV_IO_POOL_SIZE,
119 	.bdev_io_cache_size = SPDK_BDEV_IO_CACHE_SIZE,
120 };
121 
122 static spdk_bdev_init_cb	g_init_cb_fn = NULL;
123 static void			*g_init_cb_arg = NULL;
124 
125 static spdk_bdev_fini_cb	g_fini_cb_fn = NULL;
126 static void			*g_fini_cb_arg = NULL;
127 static struct spdk_thread	*g_fini_thread = NULL;
128 
129 struct spdk_bdev_qos {
130 	/** Rate limit, in I/O per second */
131 	uint64_t iops_rate_limit;
132 
133 	/** Rate limit, in byte per second */
134 	uint64_t byte_rate_limit;
135 
136 	/** The channel that all I/O are funneled through */
137 	struct spdk_bdev_channel *ch;
138 
139 	/** The thread on which the poller is running. */
140 	struct spdk_thread *thread;
141 
142 	/** Queue of I/O waiting to be issued. */
143 	bdev_io_tailq_t queued;
144 
145 	/** Size of a timeslice in tsc ticks. */
146 	uint64_t timeslice_size;
147 
148 	/** Timestamp of start of last timeslice. */
149 	uint64_t last_timeslice;
150 
151 	/** Maximum allowed IOs to be issued in one timeslice (e.g., 1ms) and
152 	 *  only valid for the master channel which manages the outstanding IOs. */
153 	uint64_t max_ios_per_timeslice;
154 
155 	/** Maximum allowed bytes to be issued in one timeslice (e.g., 1ms) and
156 	 *  only valid for the master channel which manages the outstanding IOs. */
157 	uint64_t max_byte_per_timeslice;
158 
159 	/** Remaining IO allowed in current timeslice (e.g., 1ms) */
160 	uint64_t io_remaining_this_timeslice;
161 
162 	/** Remaining bytes allowed in current timeslice (e.g., 1ms).
163 	 *  Allowed to run negative if an I/O is submitted when some bytes are remaining,
164 	 *  but the I/O is bigger than that amount.  The excess will be deducted from the
165 	 *  next timeslice.
166 	 */
167 	int64_t byte_remaining_this_timeslice;
168 
169 	/** Poller that processes queued I/O commands each time slice. */
170 	struct spdk_poller *poller;
171 };
172 
173 struct spdk_bdev_mgmt_channel {
174 	bdev_io_stailq_t need_buf_small;
175 	bdev_io_stailq_t need_buf_large;
176 
177 	/*
178 	 * Each thread keeps a cache of bdev_io - this allows
179 	 *  bdev threads which are *not* DPDK threads to still
180 	 *  benefit from a per-thread bdev_io cache.  Without
181 	 *  this, non-DPDK threads fetching from the mempool
182 	 *  incur a cmpxchg on get and put.
183 	 */
184 	bdev_io_stailq_t per_thread_cache;
185 	uint32_t	per_thread_cache_count;
186 	uint32_t	bdev_io_cache_size;
187 
188 	TAILQ_HEAD(, spdk_bdev_shared_resource)	shared_resources;
189 	TAILQ_HEAD(, spdk_bdev_io_wait_entry)	io_wait_queue;
190 };
191 
192 /*
193  * Per-module (or per-io_device) data. Multiple bdevs built on the same io_device
194  * will queue here their IO that awaits retry. It makes it possible to retry sending
195  * IO to one bdev after IO from other bdev completes.
196  */
197 struct spdk_bdev_shared_resource {
198 	/* The bdev management channel */
199 	struct spdk_bdev_mgmt_channel *mgmt_ch;
200 
201 	/*
202 	 * Count of I/O submitted to bdev module and waiting for completion.
203 	 * Incremented before submit_request() is called on an spdk_bdev_io.
204 	 */
205 	uint64_t		io_outstanding;
206 
207 	/*
208 	 * Queue of IO awaiting retry because of a previous NOMEM status returned
209 	 *  on this channel.
210 	 */
211 	bdev_io_tailq_t		nomem_io;
212 
213 	/*
214 	 * Threshold which io_outstanding must drop to before retrying nomem_io.
215 	 */
216 	uint64_t		nomem_threshold;
217 
218 	/* I/O channel allocated by a bdev module */
219 	struct spdk_io_channel	*shared_ch;
220 
221 	/* Refcount of bdev channels using this resource */
222 	uint32_t		ref;
223 
224 	TAILQ_ENTRY(spdk_bdev_shared_resource) link;
225 };
226 
227 #define BDEV_CH_RESET_IN_PROGRESS	(1 << 0)
228 #define BDEV_CH_QOS_ENABLED		(1 << 1)
229 
230 struct spdk_bdev_channel {
231 	struct spdk_bdev	*bdev;
232 
233 	/* The channel for the underlying device */
234 	struct spdk_io_channel	*channel;
235 
236 	/* Per io_device per thread data */
237 	struct spdk_bdev_shared_resource *shared_resource;
238 
239 	struct spdk_bdev_io_stat stat;
240 
241 	/*
242 	 * Count of I/O submitted through this channel and waiting for completion.
243 	 * Incremented before submit_request() is called on an spdk_bdev_io.
244 	 */
245 	uint64_t		io_outstanding;
246 
247 	bdev_io_tailq_t		queued_resets;
248 
249 	uint32_t		flags;
250 
251 #ifdef SPDK_CONFIG_VTUNE
252 	uint64_t		start_tsc;
253 	uint64_t		interval_tsc;
254 	__itt_string_handle	*handle;
255 	struct spdk_bdev_io_stat prev_stat;
256 #endif
257 
258 };
259 
260 struct spdk_bdev_desc {
261 	struct spdk_bdev		*bdev;
262 	spdk_bdev_remove_cb_t		remove_cb;
263 	void				*remove_ctx;
264 	bool				remove_scheduled;
265 	bool				write;
266 	TAILQ_ENTRY(spdk_bdev_desc)	link;
267 };
268 
269 struct spdk_bdev_iostat_ctx {
270 	struct spdk_bdev_io_stat *stat;
271 	spdk_bdev_get_device_stat_cb cb;
272 	void *cb_arg;
273 };
274 
275 #define __bdev_to_io_dev(bdev)		(((char *)bdev) + 1)
276 #define __bdev_from_io_dev(io_dev)	((struct spdk_bdev *)(((char *)io_dev) - 1))
277 
278 static void _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success,
279 		void *cb_arg);
280 static void _spdk_bdev_write_zero_buffer_next(void *_bdev_io);
281 
282 void
283 spdk_bdev_get_opts(struct spdk_bdev_opts *opts)
284 {
285 	*opts = g_bdev_opts;
286 }
287 
288 int
289 spdk_bdev_set_opts(struct spdk_bdev_opts *opts)
290 {
291 	uint32_t min_pool_size;
292 
293 	/*
294 	 * Add 1 to the thread count to account for the extra mgmt_ch that gets created during subsystem
295 	 *  initialization.  A second mgmt_ch will be created on the same thread when the application starts
296 	 *  but before the deferred put_io_channel event is executed for the first mgmt_ch.
297 	 */
298 	min_pool_size = opts->bdev_io_cache_size * (spdk_thread_get_count() + 1);
299 	if (opts->bdev_io_pool_size < min_pool_size) {
300 		SPDK_ERRLOG("bdev_io_pool_size %" PRIu32 " is not compatible with bdev_io_cache_size %" PRIu32
301 			    " and %" PRIu32 " threads\n", opts->bdev_io_pool_size, opts->bdev_io_cache_size,
302 			    spdk_thread_get_count());
303 		SPDK_ERRLOG("bdev_io_pool_size must be at least %" PRIu32 "\n", min_pool_size);
304 		return -1;
305 	}
306 
307 	g_bdev_opts = *opts;
308 	return 0;
309 }
310 
311 struct spdk_bdev *
312 spdk_bdev_first(void)
313 {
314 	struct spdk_bdev *bdev;
315 
316 	bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs);
317 	if (bdev) {
318 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name);
319 	}
320 
321 	return bdev;
322 }
323 
324 struct spdk_bdev *
325 spdk_bdev_next(struct spdk_bdev *prev)
326 {
327 	struct spdk_bdev *bdev;
328 
329 	bdev = TAILQ_NEXT(prev, internal.link);
330 	if (bdev) {
331 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
332 	}
333 
334 	return bdev;
335 }
336 
337 static struct spdk_bdev *
338 _bdev_next_leaf(struct spdk_bdev *bdev)
339 {
340 	while (bdev != NULL) {
341 		if (bdev->internal.claim_module == NULL) {
342 			return bdev;
343 		} else {
344 			bdev = TAILQ_NEXT(bdev, internal.link);
345 		}
346 	}
347 
348 	return bdev;
349 }
350 
351 struct spdk_bdev *
352 spdk_bdev_first_leaf(void)
353 {
354 	struct spdk_bdev *bdev;
355 
356 	bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs));
357 
358 	if (bdev) {
359 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name);
360 	}
361 
362 	return bdev;
363 }
364 
365 struct spdk_bdev *
366 spdk_bdev_next_leaf(struct spdk_bdev *prev)
367 {
368 	struct spdk_bdev *bdev;
369 
370 	bdev = _bdev_next_leaf(TAILQ_NEXT(prev, internal.link));
371 
372 	if (bdev) {
373 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
374 	}
375 
376 	return bdev;
377 }
378 
379 struct spdk_bdev *
380 spdk_bdev_get_by_name(const char *bdev_name)
381 {
382 	struct spdk_bdev_alias *tmp;
383 	struct spdk_bdev *bdev = spdk_bdev_first();
384 
385 	while (bdev != NULL) {
386 		if (strcmp(bdev_name, bdev->name) == 0) {
387 			return bdev;
388 		}
389 
390 		TAILQ_FOREACH(tmp, &bdev->aliases, tailq) {
391 			if (strcmp(bdev_name, tmp->alias) == 0) {
392 				return bdev;
393 			}
394 		}
395 
396 		bdev = spdk_bdev_next(bdev);
397 	}
398 
399 	return NULL;
400 }
401 
402 void
403 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len)
404 {
405 	struct iovec *iovs;
406 
407 	iovs = bdev_io->u.bdev.iovs;
408 
409 	assert(iovs != NULL);
410 	assert(bdev_io->u.bdev.iovcnt >= 1);
411 
412 	iovs[0].iov_base = buf;
413 	iovs[0].iov_len = len;
414 }
415 
416 static void
417 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io)
418 {
419 	struct spdk_mempool *pool;
420 	struct spdk_bdev_io *tmp;
421 	void *buf, *aligned_buf;
422 	bdev_io_stailq_t *stailq;
423 	struct spdk_bdev_mgmt_channel *ch;
424 
425 	assert(bdev_io->u.bdev.iovcnt == 1);
426 
427 	buf = bdev_io->internal.buf;
428 	ch = bdev_io->internal.ch->shared_resource->mgmt_ch;
429 
430 	bdev_io->internal.buf = NULL;
431 
432 	if (bdev_io->internal.buf_len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
433 		pool = g_bdev_mgr.buf_small_pool;
434 		stailq = &ch->need_buf_small;
435 	} else {
436 		pool = g_bdev_mgr.buf_large_pool;
437 		stailq = &ch->need_buf_large;
438 	}
439 
440 	if (STAILQ_EMPTY(stailq)) {
441 		spdk_mempool_put(pool, buf);
442 	} else {
443 		tmp = STAILQ_FIRST(stailq);
444 
445 		aligned_buf = (void *)(((uintptr_t)buf + 511) & ~511UL);
446 		spdk_bdev_io_set_buf(bdev_io, aligned_buf, tmp->internal.buf_len);
447 
448 		STAILQ_REMOVE_HEAD(stailq, internal.buf_link);
449 		tmp->internal.buf = buf;
450 		tmp->internal.get_buf_cb(tmp->internal.ch->channel, tmp);
451 	}
452 }
453 
454 void
455 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len)
456 {
457 	struct spdk_mempool *pool;
458 	bdev_io_stailq_t *stailq;
459 	void *buf, *aligned_buf;
460 	struct spdk_bdev_mgmt_channel *mgmt_ch;
461 
462 	assert(cb != NULL);
463 	assert(bdev_io->u.bdev.iovs != NULL);
464 
465 	if (spdk_unlikely(bdev_io->u.bdev.iovs[0].iov_base != NULL)) {
466 		/* Buffer already present */
467 		cb(bdev_io->internal.ch->channel, bdev_io);
468 		return;
469 	}
470 
471 	assert(len <= SPDK_BDEV_LARGE_BUF_MAX_SIZE);
472 	mgmt_ch = bdev_io->internal.ch->shared_resource->mgmt_ch;
473 
474 	bdev_io->internal.buf_len = len;
475 	bdev_io->internal.get_buf_cb = cb;
476 	if (len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
477 		pool = g_bdev_mgr.buf_small_pool;
478 		stailq = &mgmt_ch->need_buf_small;
479 	} else {
480 		pool = g_bdev_mgr.buf_large_pool;
481 		stailq = &mgmt_ch->need_buf_large;
482 	}
483 
484 	buf = spdk_mempool_get(pool);
485 
486 	if (!buf) {
487 		STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link);
488 	} else {
489 		aligned_buf = (void *)(((uintptr_t)buf + 511) & ~511UL);
490 		spdk_bdev_io_set_buf(bdev_io, aligned_buf, len);
491 
492 		bdev_io->internal.buf = buf;
493 		bdev_io->internal.get_buf_cb(bdev_io->internal.ch->channel, bdev_io);
494 	}
495 }
496 
497 static int
498 spdk_bdev_module_get_max_ctx_size(void)
499 {
500 	struct spdk_bdev_module *bdev_module;
501 	int max_bdev_module_size = 0;
502 
503 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) {
504 		if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) {
505 			max_bdev_module_size = bdev_module->get_ctx_size();
506 		}
507 	}
508 
509 	return max_bdev_module_size;
510 }
511 
512 void
513 spdk_bdev_config_text(FILE *fp)
514 {
515 	struct spdk_bdev_module *bdev_module;
516 
517 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) {
518 		if (bdev_module->config_text) {
519 			bdev_module->config_text(fp);
520 		}
521 	}
522 }
523 
524 void
525 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w)
526 {
527 	struct spdk_bdev_module *bdev_module;
528 	struct spdk_bdev *bdev;
529 
530 	assert(w != NULL);
531 
532 	spdk_json_write_array_begin(w);
533 
534 	spdk_json_write_object_begin(w);
535 	spdk_json_write_named_string(w, "method", "set_bdev_options");
536 	spdk_json_write_name(w, "params");
537 	spdk_json_write_object_begin(w);
538 	spdk_json_write_named_uint32(w, "bdev_io_pool_size", g_bdev_opts.bdev_io_pool_size);
539 	spdk_json_write_named_uint32(w, "bdev_io_cache_size", g_bdev_opts.bdev_io_cache_size);
540 	spdk_json_write_object_end(w);
541 	spdk_json_write_object_end(w);
542 
543 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) {
544 		if (bdev_module->config_json) {
545 			bdev_module->config_json(w);
546 		}
547 	}
548 
549 	TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, internal.link) {
550 		spdk_bdev_config_json(bdev, w);
551 	}
552 
553 	spdk_json_write_array_end(w);
554 }
555 
556 static int
557 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf)
558 {
559 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
560 	struct spdk_bdev_io *bdev_io;
561 	uint32_t i;
562 
563 	STAILQ_INIT(&ch->need_buf_small);
564 	STAILQ_INIT(&ch->need_buf_large);
565 
566 	STAILQ_INIT(&ch->per_thread_cache);
567 	ch->bdev_io_cache_size = g_bdev_opts.bdev_io_cache_size;
568 
569 	/* Pre-populate bdev_io cache to ensure this thread cannot be starved. */
570 	ch->per_thread_cache_count = 0;
571 	for (i = 0; i < ch->bdev_io_cache_size; i++) {
572 		bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool);
573 		assert(bdev_io != NULL);
574 		ch->per_thread_cache_count++;
575 		STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link);
576 	}
577 
578 	TAILQ_INIT(&ch->shared_resources);
579 	TAILQ_INIT(&ch->io_wait_queue);
580 
581 	return 0;
582 }
583 
584 static void
585 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf)
586 {
587 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
588 	struct spdk_bdev_io *bdev_io;
589 
590 	if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) {
591 		SPDK_ERRLOG("Pending I/O list wasn't empty on mgmt channel free\n");
592 	}
593 
594 	if (!TAILQ_EMPTY(&ch->shared_resources)) {
595 		SPDK_ERRLOG("Module channel list wasn't empty on mgmt channel free\n");
596 	}
597 
598 	while (!STAILQ_EMPTY(&ch->per_thread_cache)) {
599 		bdev_io = STAILQ_FIRST(&ch->per_thread_cache);
600 		STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link);
601 		ch->per_thread_cache_count--;
602 		spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
603 	}
604 
605 	assert(ch->per_thread_cache_count == 0);
606 }
607 
608 static void
609 spdk_bdev_init_complete(int rc)
610 {
611 	spdk_bdev_init_cb cb_fn = g_init_cb_fn;
612 	void *cb_arg = g_init_cb_arg;
613 	struct spdk_bdev_module *m;
614 
615 	g_bdev_mgr.init_complete = true;
616 	g_init_cb_fn = NULL;
617 	g_init_cb_arg = NULL;
618 
619 	/*
620 	 * For modules that need to know when subsystem init is complete,
621 	 * inform them now.
622 	 */
623 	if (rc == 0) {
624 		TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) {
625 			if (m->init_complete) {
626 				m->init_complete();
627 			}
628 		}
629 	}
630 
631 	cb_fn(cb_arg, rc);
632 }
633 
634 static void
635 spdk_bdev_module_action_complete(void)
636 {
637 	struct spdk_bdev_module *m;
638 
639 	/*
640 	 * Don't finish bdev subsystem initialization if
641 	 * module pre-initialization is still in progress, or
642 	 * the subsystem been already initialized.
643 	 */
644 	if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) {
645 		return;
646 	}
647 
648 	/*
649 	 * Check all bdev modules for inits/examinations in progress. If any
650 	 * exist, return immediately since we cannot finish bdev subsystem
651 	 * initialization until all are completed.
652 	 */
653 	TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) {
654 		if (m->internal.action_in_progress > 0) {
655 			return;
656 		}
657 	}
658 
659 	/*
660 	 * Modules already finished initialization - now that all
661 	 * the bdev modules have finished their asynchronous I/O
662 	 * processing, the entire bdev layer can be marked as complete.
663 	 */
664 	spdk_bdev_init_complete(0);
665 }
666 
667 static void
668 spdk_bdev_module_action_done(struct spdk_bdev_module *module)
669 {
670 	assert(module->internal.action_in_progress > 0);
671 	module->internal.action_in_progress--;
672 	spdk_bdev_module_action_complete();
673 }
674 
675 void
676 spdk_bdev_module_init_done(struct spdk_bdev_module *module)
677 {
678 	spdk_bdev_module_action_done(module);
679 }
680 
681 void
682 spdk_bdev_module_examine_done(struct spdk_bdev_module *module)
683 {
684 	spdk_bdev_module_action_done(module);
685 }
686 
687 static int
688 spdk_bdev_modules_init(void)
689 {
690 	struct spdk_bdev_module *module;
691 	int rc = 0;
692 
693 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) {
694 		rc = module->module_init();
695 		if (rc != 0) {
696 			break;
697 		}
698 	}
699 
700 	g_bdev_mgr.module_init_complete = true;
701 	return rc;
702 }
703 
704 
705 static void
706 spdk_bdev_init_failed_complete(void *cb_arg)
707 {
708 	spdk_bdev_init_complete(-1);
709 }
710 
711 static void
712 spdk_bdev_init_failed(void *cb_arg)
713 {
714 	spdk_bdev_finish(spdk_bdev_init_failed_complete, NULL);
715 }
716 
717 void
718 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg)
719 {
720 	struct spdk_conf_section *sp;
721 	struct spdk_bdev_opts bdev_opts;
722 	int32_t bdev_io_pool_size, bdev_io_cache_size;
723 	int cache_size;
724 	int rc = 0;
725 	char mempool_name[32];
726 
727 	assert(cb_fn != NULL);
728 
729 	sp = spdk_conf_find_section(NULL, "Bdev");
730 	if (sp != NULL) {
731 		spdk_bdev_get_opts(&bdev_opts);
732 
733 		bdev_io_pool_size = spdk_conf_section_get_intval(sp, "BdevIoPoolSize");
734 		if (bdev_io_pool_size >= 0) {
735 			bdev_opts.bdev_io_pool_size = bdev_io_pool_size;
736 		}
737 
738 		bdev_io_cache_size = spdk_conf_section_get_intval(sp, "BdevIoCacheSize");
739 		if (bdev_io_cache_size >= 0) {
740 			bdev_opts.bdev_io_cache_size = bdev_io_cache_size;
741 		}
742 
743 		if (spdk_bdev_set_opts(&bdev_opts)) {
744 			spdk_bdev_init_complete(-1);
745 			return;
746 		}
747 
748 		assert(memcmp(&bdev_opts, &g_bdev_opts, sizeof(bdev_opts)) == 0);
749 	}
750 
751 	g_init_cb_fn = cb_fn;
752 	g_init_cb_arg = cb_arg;
753 
754 	snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid());
755 
756 	g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name,
757 				  g_bdev_opts.bdev_io_pool_size,
758 				  sizeof(struct spdk_bdev_io) +
759 				  spdk_bdev_module_get_max_ctx_size(),
760 				  0,
761 				  SPDK_ENV_SOCKET_ID_ANY);
762 
763 	if (g_bdev_mgr.bdev_io_pool == NULL) {
764 		SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n");
765 		spdk_bdev_init_complete(-1);
766 		return;
767 	}
768 
769 	/**
770 	 * Ensure no more than half of the total buffers end up local caches, by
771 	 *   using spdk_thread_get_count() to determine how many local caches we need
772 	 *   to account for.
773 	 */
774 	cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_thread_get_count());
775 	snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid());
776 
777 	g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name,
778 				    BUF_SMALL_POOL_SIZE,
779 				    SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512,
780 				    cache_size,
781 				    SPDK_ENV_SOCKET_ID_ANY);
782 	if (!g_bdev_mgr.buf_small_pool) {
783 		SPDK_ERRLOG("create rbuf small pool failed\n");
784 		spdk_bdev_init_complete(-1);
785 		return;
786 	}
787 
788 	cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_thread_get_count());
789 	snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid());
790 
791 	g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name,
792 				    BUF_LARGE_POOL_SIZE,
793 				    SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512,
794 				    cache_size,
795 				    SPDK_ENV_SOCKET_ID_ANY);
796 	if (!g_bdev_mgr.buf_large_pool) {
797 		SPDK_ERRLOG("create rbuf large pool failed\n");
798 		spdk_bdev_init_complete(-1);
799 		return;
800 	}
801 
802 	g_bdev_mgr.zero_buffer = spdk_dma_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE,
803 				 NULL);
804 	if (!g_bdev_mgr.zero_buffer) {
805 		SPDK_ERRLOG("create bdev zero buffer failed\n");
806 		spdk_bdev_init_complete(-1);
807 		return;
808 	}
809 
810 #ifdef SPDK_CONFIG_VTUNE
811 	g_bdev_mgr.domain = __itt_domain_create("spdk_bdev");
812 #endif
813 
814 	spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create,
815 				spdk_bdev_mgmt_channel_destroy,
816 				sizeof(struct spdk_bdev_mgmt_channel));
817 
818 	rc = spdk_bdev_modules_init();
819 	if (rc != 0) {
820 		SPDK_ERRLOG("bdev modules init failed\n");
821 		spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_init_failed, NULL);
822 		return;
823 	}
824 
825 	spdk_bdev_module_action_complete();
826 }
827 
828 static void
829 spdk_bdev_mgr_unregister_cb(void *io_device)
830 {
831 	spdk_bdev_fini_cb cb_fn = g_fini_cb_fn;
832 
833 	if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != g_bdev_opts.bdev_io_pool_size) {
834 		SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n",
835 			    spdk_mempool_count(g_bdev_mgr.bdev_io_pool),
836 			    g_bdev_opts.bdev_io_pool_size);
837 	}
838 
839 	if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) {
840 		SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n",
841 			    spdk_mempool_count(g_bdev_mgr.buf_small_pool),
842 			    BUF_SMALL_POOL_SIZE);
843 		assert(false);
844 	}
845 
846 	if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) {
847 		SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n",
848 			    spdk_mempool_count(g_bdev_mgr.buf_large_pool),
849 			    BUF_LARGE_POOL_SIZE);
850 		assert(false);
851 	}
852 
853 	spdk_mempool_free(g_bdev_mgr.bdev_io_pool);
854 	spdk_mempool_free(g_bdev_mgr.buf_small_pool);
855 	spdk_mempool_free(g_bdev_mgr.buf_large_pool);
856 	spdk_dma_free(g_bdev_mgr.zero_buffer);
857 
858 	cb_fn(g_fini_cb_arg);
859 	g_fini_cb_fn = NULL;
860 	g_fini_cb_arg = NULL;
861 }
862 
863 static struct spdk_bdev_module *g_resume_bdev_module = NULL;
864 
865 static void
866 spdk_bdev_module_finish_iter(void *arg)
867 {
868 	struct spdk_bdev_module *bdev_module;
869 
870 	/* Start iterating from the last touched module */
871 	if (!g_resume_bdev_module) {
872 		bdev_module = TAILQ_FIRST(&g_bdev_mgr.bdev_modules);
873 	} else {
874 		bdev_module = TAILQ_NEXT(g_resume_bdev_module, internal.tailq);
875 	}
876 
877 	while (bdev_module) {
878 		if (bdev_module->async_fini) {
879 			/* Save our place so we can resume later. We must
880 			 * save the variable here, before calling module_fini()
881 			 * below, because in some cases the module may immediately
882 			 * call spdk_bdev_module_finish_done() and re-enter
883 			 * this function to continue iterating. */
884 			g_resume_bdev_module = bdev_module;
885 		}
886 
887 		if (bdev_module->module_fini) {
888 			bdev_module->module_fini();
889 		}
890 
891 		if (bdev_module->async_fini) {
892 			return;
893 		}
894 
895 		bdev_module = TAILQ_NEXT(bdev_module, internal.tailq);
896 	}
897 
898 	g_resume_bdev_module = NULL;
899 	spdk_io_device_unregister(&g_bdev_mgr, spdk_bdev_mgr_unregister_cb);
900 }
901 
902 void
903 spdk_bdev_module_finish_done(void)
904 {
905 	if (spdk_get_thread() != g_fini_thread) {
906 		spdk_thread_send_msg(g_fini_thread, spdk_bdev_module_finish_iter, NULL);
907 	} else {
908 		spdk_bdev_module_finish_iter(NULL);
909 	}
910 }
911 
912 static void
913 _spdk_bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno)
914 {
915 	struct spdk_bdev *bdev = cb_arg;
916 
917 	if (bdeverrno && bdev) {
918 		SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n",
919 			     bdev->name);
920 
921 		/*
922 		 * Since the call to spdk_bdev_unregister() failed, we have no way to free this
923 		 *  bdev; try to continue by manually removing this bdev from the list and continue
924 		 *  with the next bdev in the list.
925 		 */
926 		TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link);
927 	}
928 
929 	if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) {
930 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n");
931 		/*
932 		 * Bdev module finish need to be deffered as we might be in the middle of some context
933 		 * (like bdev part free) that will use this bdev (or private bdev driver ctx data)
934 		 * after returning.
935 		 */
936 		spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_module_finish_iter, NULL);
937 		return;
938 	}
939 
940 	/*
941 	 * Unregister the last bdev in the list.  The last bdev in the list should be a bdev
942 	 * that has no bdevs that depend on it.
943 	 */
944 	bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list);
945 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name);
946 	spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev);
947 }
948 
949 void
950 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg)
951 {
952 	struct spdk_bdev_module *m;
953 
954 	assert(cb_fn != NULL);
955 
956 	g_fini_thread = spdk_get_thread();
957 
958 	g_fini_cb_fn = cb_fn;
959 	g_fini_cb_arg = cb_arg;
960 
961 	TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) {
962 		if (m->fini_start) {
963 			m->fini_start();
964 		}
965 	}
966 
967 	_spdk_bdev_finish_unregister_bdevs_iter(NULL, 0);
968 }
969 
970 static struct spdk_bdev_io *
971 spdk_bdev_get_io(struct spdk_bdev_channel *channel)
972 {
973 	struct spdk_bdev_mgmt_channel *ch = channel->shared_resource->mgmt_ch;
974 	struct spdk_bdev_io *bdev_io;
975 
976 	if (ch->per_thread_cache_count > 0) {
977 		bdev_io = STAILQ_FIRST(&ch->per_thread_cache);
978 		STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link);
979 		ch->per_thread_cache_count--;
980 	} else if (spdk_unlikely(!TAILQ_EMPTY(&ch->io_wait_queue))) {
981 		/*
982 		 * Don't try to look for bdev_ios in the global pool if there are
983 		 * waiters on bdev_ios - we don't want this caller to jump the line.
984 		 */
985 		bdev_io = NULL;
986 	} else {
987 		bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool);
988 	}
989 
990 	return bdev_io;
991 }
992 
993 void
994 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io)
995 {
996 	struct spdk_bdev_mgmt_channel *ch = bdev_io->internal.ch->shared_resource->mgmt_ch;
997 
998 	assert(bdev_io != NULL);
999 	assert(bdev_io->internal.status != SPDK_BDEV_IO_STATUS_PENDING);
1000 
1001 	if (bdev_io->internal.buf != NULL) {
1002 		spdk_bdev_io_put_buf(bdev_io);
1003 	}
1004 
1005 	if (ch->per_thread_cache_count < ch->bdev_io_cache_size) {
1006 		ch->per_thread_cache_count++;
1007 		STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link);
1008 		while (ch->per_thread_cache_count > 0 && !TAILQ_EMPTY(&ch->io_wait_queue)) {
1009 			struct spdk_bdev_io_wait_entry *entry;
1010 
1011 			entry = TAILQ_FIRST(&ch->io_wait_queue);
1012 			TAILQ_REMOVE(&ch->io_wait_queue, entry, link);
1013 			entry->cb_fn(entry->cb_arg);
1014 		}
1015 	} else {
1016 		/* We should never have a full cache with entries on the io wait queue. */
1017 		assert(TAILQ_EMPTY(&ch->io_wait_queue));
1018 		spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
1019 	}
1020 }
1021 
1022 static uint64_t
1023 _spdk_bdev_get_io_size_in_byte(struct spdk_bdev_io *bdev_io)
1024 {
1025 	struct spdk_bdev	*bdev = bdev_io->bdev;
1026 
1027 	switch (bdev_io->type) {
1028 	case SPDK_BDEV_IO_TYPE_NVME_ADMIN:
1029 	case SPDK_BDEV_IO_TYPE_NVME_IO:
1030 	case SPDK_BDEV_IO_TYPE_NVME_IO_MD:
1031 		return bdev_io->u.nvme_passthru.nbytes;
1032 	case SPDK_BDEV_IO_TYPE_READ:
1033 	case SPDK_BDEV_IO_TYPE_WRITE:
1034 	case SPDK_BDEV_IO_TYPE_UNMAP:
1035 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
1036 		return bdev_io->u.bdev.num_blocks * bdev->blocklen;
1037 	default:
1038 		return 0;
1039 	}
1040 }
1041 
1042 static void
1043 _spdk_bdev_qos_io_submit(struct spdk_bdev_channel *ch)
1044 {
1045 	struct spdk_bdev_io		*bdev_io = NULL;
1046 	struct spdk_bdev		*bdev = ch->bdev;
1047 	struct spdk_bdev_qos		*qos = bdev->internal.qos;
1048 	struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource;
1049 
1050 	while (!TAILQ_EMPTY(&qos->queued)) {
1051 		if (qos->max_ios_per_timeslice > 0 && qos->io_remaining_this_timeslice == 0) {
1052 			break;
1053 		}
1054 
1055 		if (qos->max_byte_per_timeslice > 0 && qos->byte_remaining_this_timeslice <= 0) {
1056 			break;
1057 		}
1058 
1059 		bdev_io = TAILQ_FIRST(&qos->queued);
1060 		TAILQ_REMOVE(&qos->queued, bdev_io, internal.link);
1061 		qos->io_remaining_this_timeslice--;
1062 		qos->byte_remaining_this_timeslice -= _spdk_bdev_get_io_size_in_byte(bdev_io);
1063 		ch->io_outstanding++;
1064 		shared_resource->io_outstanding++;
1065 		bdev->fn_table->submit_request(ch->channel, bdev_io);
1066 	}
1067 }
1068 
1069 static bool
1070 _spdk_bdev_io_type_can_split(uint8_t type)
1071 {
1072 	assert(type != SPDK_BDEV_IO_TYPE_INVALID);
1073 	assert(type < SPDK_BDEV_NUM_IO_TYPES);
1074 
1075 	/* Only split READ and WRITE I/O.  Theoretically other types of I/O like
1076 	 * UNMAP could be split, but these types of I/O are typically much larger
1077 	 * in size (sometimes the size of the entire block device), and the bdev
1078 	 * module can more efficiently split these types of I/O.  Plus those types
1079 	 * of I/O do not have a payload, which makes the splitting process simpler.
1080 	 */
1081 	if (type == SPDK_BDEV_IO_TYPE_READ || type == SPDK_BDEV_IO_TYPE_WRITE) {
1082 		return true;
1083 	} else {
1084 		return false;
1085 	}
1086 }
1087 
1088 static bool
1089 _spdk_bdev_io_should_split(struct spdk_bdev_io *bdev_io)
1090 {
1091 	uint64_t start_stripe, end_stripe;
1092 	uint32_t io_boundary = bdev_io->bdev->optimal_io_boundary;
1093 
1094 	if (io_boundary == 0) {
1095 		return false;
1096 	}
1097 
1098 	if (!_spdk_bdev_io_type_can_split(bdev_io->type)) {
1099 		return false;
1100 	}
1101 
1102 	start_stripe = bdev_io->u.bdev.offset_blocks;
1103 	end_stripe = start_stripe + bdev_io->u.bdev.num_blocks - 1;
1104 	/* Avoid expensive div operations if possible.  These spdk_u32 functions are very cheap. */
1105 	if (spdk_likely(spdk_u32_is_pow2(io_boundary))) {
1106 		start_stripe >>= spdk_u32log2(io_boundary);
1107 		end_stripe >>= spdk_u32log2(io_boundary);
1108 	} else {
1109 		start_stripe /= io_boundary;
1110 		end_stripe /= io_boundary;
1111 	}
1112 	return (start_stripe != end_stripe);
1113 }
1114 
1115 static uint32_t
1116 _to_next_boundary(uint64_t offset, uint32_t boundary)
1117 {
1118 	return (boundary - (offset % boundary));
1119 }
1120 
1121 static void
1122 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg);
1123 
1124 static void
1125 _spdk_bdev_io_split_with_payload(void *_bdev_io)
1126 {
1127 	struct spdk_bdev_io *bdev_io = _bdev_io;
1128 	uint64_t current_offset, remaining, bytes_handled;
1129 	uint32_t blocklen, to_next_boundary, to_next_boundary_bytes;
1130 	struct iovec *parent_iov;
1131 	uint64_t parent_iov_offset, child_iov_len;
1132 	uint32_t child_iovcnt;
1133 	int rc;
1134 
1135 	remaining = bdev_io->u.bdev.split_remaining_num_blocks;
1136 	current_offset = bdev_io->u.bdev.split_current_offset_blocks;
1137 	blocklen = bdev_io->bdev->blocklen;
1138 	bytes_handled = (current_offset - bdev_io->u.bdev.offset_blocks) * blocklen;
1139 	parent_iov = &bdev_io->u.bdev.iovs[0];
1140 	parent_iov_offset = 0;
1141 
1142 	while (bytes_handled > 0) {
1143 		if (bytes_handled >= parent_iov->iov_len) {
1144 			bytes_handled -= parent_iov->iov_len;
1145 			parent_iov++;
1146 			continue;
1147 		}
1148 		parent_iov_offset += bytes_handled;
1149 		break;
1150 	}
1151 
1152 	to_next_boundary = _to_next_boundary(current_offset, bdev_io->bdev->optimal_io_boundary);
1153 	to_next_boundary = spdk_min(remaining, to_next_boundary);
1154 	to_next_boundary_bytes = to_next_boundary * blocklen;
1155 	child_iovcnt = 0;
1156 	while (to_next_boundary_bytes > 0) {
1157 		child_iov_len = spdk_min(to_next_boundary_bytes, parent_iov->iov_len - parent_iov_offset);
1158 		to_next_boundary_bytes -= child_iov_len;
1159 
1160 		bdev_io->child_iov[child_iovcnt].iov_base = parent_iov->iov_base + parent_iov_offset;
1161 		bdev_io->child_iov[child_iovcnt].iov_len = child_iov_len;
1162 
1163 		parent_iov++;
1164 		parent_iov_offset = 0;
1165 		child_iovcnt++;
1166 		if (child_iovcnt == BDEV_IO_NUM_CHILD_IOV && to_next_boundary_bytes > 0) {
1167 			/* We've run out of child iovs - we need to fail this I/O. */
1168 			bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED;
1169 			bdev_io->internal.cb(bdev_io, SPDK_BDEV_IO_STATUS_FAILED,
1170 					     bdev_io->internal.caller_ctx);
1171 			return;
1172 		}
1173 	}
1174 
1175 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
1176 		rc = spdk_bdev_readv_blocks(bdev_io->internal.desc,
1177 					    spdk_io_channel_from_ctx(bdev_io->internal.ch),
1178 					    bdev_io->child_iov, child_iovcnt, current_offset, to_next_boundary,
1179 					    _spdk_bdev_io_split_done, bdev_io);
1180 	} else {
1181 		rc = spdk_bdev_writev_blocks(bdev_io->internal.desc,
1182 					     spdk_io_channel_from_ctx(bdev_io->internal.ch),
1183 					     bdev_io->child_iov, child_iovcnt, current_offset, to_next_boundary,
1184 					     _spdk_bdev_io_split_done, bdev_io);
1185 	}
1186 
1187 	if (rc == 0) {
1188 		bdev_io->u.bdev.split_current_offset_blocks += to_next_boundary;
1189 		bdev_io->u.bdev.split_remaining_num_blocks -= to_next_boundary;
1190 	} else {
1191 		assert(rc == -ENOMEM);
1192 		bdev_io->internal.waitq_entry.bdev = bdev_io->bdev;
1193 		bdev_io->internal.waitq_entry.cb_fn = _spdk_bdev_io_split_with_payload;
1194 		bdev_io->internal.waitq_entry.cb_arg = bdev_io;
1195 		spdk_bdev_queue_io_wait(bdev_io->bdev, spdk_io_channel_from_ctx(bdev_io->internal.ch),
1196 					&bdev_io->internal.waitq_entry);
1197 	}
1198 }
1199 
1200 static void
1201 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1202 {
1203 	struct spdk_bdev_io *parent_io = cb_arg;
1204 
1205 	spdk_bdev_free_io(bdev_io);
1206 
1207 	if (!success) {
1208 		parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED;
1209 		parent_io->internal.cb(parent_io, SPDK_BDEV_IO_STATUS_FAILED, parent_io->internal.caller_ctx);
1210 		return;
1211 	}
1212 
1213 	if (parent_io->u.bdev.split_remaining_num_blocks == 0) {
1214 		parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS;
1215 		parent_io->internal.cb(parent_io, SPDK_BDEV_IO_STATUS_SUCCESS, parent_io->internal.caller_ctx);
1216 		return;
1217 	}
1218 
1219 	/*
1220 	 * Continue with the splitting process.  This function will complete the parent I/O if the
1221 	 * splitting is done.
1222 	 */
1223 	_spdk_bdev_io_split_with_payload(parent_io);
1224 }
1225 
1226 static void
1227 _spdk_bdev_io_split(struct spdk_bdev_io *bdev_io)
1228 {
1229 	assert(_spdk_bdev_io_type_can_split(bdev_io->type));
1230 
1231 	bdev_io->u.bdev.split_current_offset_blocks = bdev_io->u.bdev.offset_blocks;
1232 	bdev_io->u.bdev.split_remaining_num_blocks = bdev_io->u.bdev.num_blocks;
1233 
1234 	_spdk_bdev_io_split_with_payload(bdev_io);
1235 }
1236 
1237 static void
1238 _spdk_bdev_io_submit(void *ctx)
1239 {
1240 	struct spdk_bdev_io *bdev_io = ctx;
1241 	struct spdk_bdev *bdev = bdev_io->bdev;
1242 	struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch;
1243 	struct spdk_io_channel *ch = bdev_ch->channel;
1244 	struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource;
1245 	uint64_t tsc;
1246 
1247 	tsc = spdk_get_ticks();
1248 	bdev_io->internal.submit_tsc = tsc;
1249 	spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_START, 0, 0, (uintptr_t)bdev_io, bdev_io->type);
1250 	bdev_ch->io_outstanding++;
1251 	shared_resource->io_outstanding++;
1252 	bdev_io->internal.in_submit_request = true;
1253 	if (spdk_likely(bdev_ch->flags == 0)) {
1254 		if (spdk_likely(TAILQ_EMPTY(&shared_resource->nomem_io))) {
1255 			bdev->fn_table->submit_request(ch, bdev_io);
1256 		} else {
1257 			bdev_ch->io_outstanding--;
1258 			shared_resource->io_outstanding--;
1259 			TAILQ_INSERT_TAIL(&shared_resource->nomem_io, bdev_io, internal.link);
1260 		}
1261 	} else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) {
1262 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1263 	} else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) {
1264 		bdev_ch->io_outstanding--;
1265 		shared_resource->io_outstanding--;
1266 		TAILQ_INSERT_TAIL(&bdev->internal.qos->queued, bdev_io, internal.link);
1267 		_spdk_bdev_qos_io_submit(bdev_ch);
1268 	} else {
1269 		SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags);
1270 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1271 	}
1272 	bdev_io->internal.in_submit_request = false;
1273 }
1274 
1275 static void
1276 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io)
1277 {
1278 	struct spdk_bdev *bdev = bdev_io->bdev;
1279 	struct spdk_thread *thread = spdk_io_channel_get_thread(bdev_io->internal.ch->channel);
1280 
1281 	assert(thread != NULL);
1282 	assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING);
1283 
1284 	if (bdev->split_on_optimal_io_boundary && _spdk_bdev_io_should_split(bdev_io)) {
1285 		_spdk_bdev_io_split(bdev_io);
1286 		return;
1287 	}
1288 
1289 	if (bdev_io->internal.ch->flags & BDEV_CH_QOS_ENABLED) {
1290 		if ((thread == bdev->internal.qos->thread) || !bdev->internal.qos->thread) {
1291 			_spdk_bdev_io_submit(bdev_io);
1292 		} else {
1293 			bdev_io->internal.io_submit_ch = bdev_io->internal.ch;
1294 			bdev_io->internal.ch = bdev->internal.qos->ch;
1295 			spdk_thread_send_msg(bdev->internal.qos->thread, _spdk_bdev_io_submit, bdev_io);
1296 		}
1297 	} else {
1298 		_spdk_bdev_io_submit(bdev_io);
1299 	}
1300 }
1301 
1302 static void
1303 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io)
1304 {
1305 	struct spdk_bdev *bdev = bdev_io->bdev;
1306 	struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch;
1307 	struct spdk_io_channel *ch = bdev_ch->channel;
1308 
1309 	assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING);
1310 
1311 	bdev_io->internal.in_submit_request = true;
1312 	bdev->fn_table->submit_request(ch, bdev_io);
1313 	bdev_io->internal.in_submit_request = false;
1314 }
1315 
1316 static void
1317 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io,
1318 		  struct spdk_bdev *bdev, void *cb_arg,
1319 		  spdk_bdev_io_completion_cb cb)
1320 {
1321 	bdev_io->bdev = bdev;
1322 	bdev_io->internal.caller_ctx = cb_arg;
1323 	bdev_io->internal.cb = cb;
1324 	bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING;
1325 	bdev_io->internal.in_submit_request = false;
1326 	bdev_io->internal.buf = NULL;
1327 	bdev_io->internal.io_submit_ch = NULL;
1328 }
1329 
1330 static bool
1331 _spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type)
1332 {
1333 	return bdev->fn_table->io_type_supported(bdev->ctxt, io_type);
1334 }
1335 
1336 bool
1337 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type)
1338 {
1339 	bool supported;
1340 
1341 	supported = _spdk_bdev_io_type_supported(bdev, io_type);
1342 
1343 	if (!supported) {
1344 		switch (io_type) {
1345 		case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
1346 			/* The bdev layer will emulate write zeroes as long as write is supported. */
1347 			supported = _spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE);
1348 			break;
1349 		default:
1350 			break;
1351 		}
1352 	}
1353 
1354 	return supported;
1355 }
1356 
1357 int
1358 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
1359 {
1360 	if (bdev->fn_table->dump_info_json) {
1361 		return bdev->fn_table->dump_info_json(bdev->ctxt, w);
1362 	}
1363 
1364 	return 0;
1365 }
1366 
1367 void
1368 spdk_bdev_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
1369 {
1370 	assert(bdev != NULL);
1371 	assert(w != NULL);
1372 
1373 	if (bdev->fn_table->write_config_json) {
1374 		bdev->fn_table->write_config_json(bdev, w);
1375 	} else {
1376 		spdk_json_write_object_begin(w);
1377 		spdk_json_write_named_string(w, "name", bdev->name);
1378 		spdk_json_write_object_end(w);
1379 	}
1380 }
1381 
1382 static void
1383 spdk_bdev_qos_update_max_quota_per_timeslice(struct spdk_bdev_qos *qos)
1384 {
1385 	uint64_t max_ios_per_timeslice = 0, max_byte_per_timeslice = 0;
1386 
1387 	if (qos->iops_rate_limit > 0) {
1388 		max_ios_per_timeslice = qos->iops_rate_limit * SPDK_BDEV_QOS_TIMESLICE_IN_USEC /
1389 					SPDK_SEC_TO_USEC;
1390 		qos->max_ios_per_timeslice = spdk_max(max_ios_per_timeslice,
1391 						      SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE);
1392 	}
1393 
1394 	if (qos->byte_rate_limit > 0) {
1395 		max_byte_per_timeslice = qos->byte_rate_limit * SPDK_BDEV_QOS_TIMESLICE_IN_USEC /
1396 					 SPDK_SEC_TO_USEC;
1397 		qos->max_byte_per_timeslice = spdk_max(max_byte_per_timeslice,
1398 						       SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE);
1399 	}
1400 }
1401 
1402 static int
1403 spdk_bdev_channel_poll_qos(void *arg)
1404 {
1405 	struct spdk_bdev_qos *qos = arg;
1406 	uint64_t now = spdk_get_ticks();
1407 
1408 	if (now < (qos->last_timeslice + qos->timeslice_size)) {
1409 		/* We received our callback earlier than expected - return
1410 		 *  immediately and wait to do accounting until at least one
1411 		 *  timeslice has actually expired.  This should never happen
1412 		 *  with a well-behaved timer implementation.
1413 		 */
1414 		return 0;
1415 	}
1416 
1417 	/* Reset for next round of rate limiting */
1418 	qos->io_remaining_this_timeslice = 0;
1419 	/* We may have allowed the bytes to slightly overrun in the last timeslice.
1420 	 * byte_remaining_this_timeslice is signed, so if it's negative here, we'll
1421 	 * account for the overrun so that the next timeslice will be appropriately
1422 	 * reduced.
1423 	 */
1424 	if (qos->byte_remaining_this_timeslice > 0) {
1425 		qos->byte_remaining_this_timeslice = 0;
1426 	}
1427 
1428 	while (now >= (qos->last_timeslice + qos->timeslice_size)) {
1429 		qos->last_timeslice += qos->timeslice_size;
1430 		qos->io_remaining_this_timeslice += qos->max_ios_per_timeslice;
1431 		qos->byte_remaining_this_timeslice += qos->max_byte_per_timeslice;
1432 	}
1433 
1434 	_spdk_bdev_qos_io_submit(qos->ch);
1435 
1436 	return -1;
1437 }
1438 
1439 static void
1440 _spdk_bdev_channel_destroy_resource(struct spdk_bdev_channel *ch)
1441 {
1442 	struct spdk_bdev_shared_resource *shared_resource;
1443 
1444 	if (!ch) {
1445 		return;
1446 	}
1447 
1448 	if (ch->channel) {
1449 		spdk_put_io_channel(ch->channel);
1450 	}
1451 
1452 	assert(ch->io_outstanding == 0);
1453 
1454 	shared_resource = ch->shared_resource;
1455 	if (shared_resource) {
1456 		assert(ch->io_outstanding == 0);
1457 		assert(shared_resource->ref > 0);
1458 		shared_resource->ref--;
1459 		if (shared_resource->ref == 0) {
1460 			assert(shared_resource->io_outstanding == 0);
1461 			TAILQ_REMOVE(&shared_resource->mgmt_ch->shared_resources, shared_resource, link);
1462 			spdk_put_io_channel(spdk_io_channel_from_ctx(shared_resource->mgmt_ch));
1463 			free(shared_resource);
1464 		}
1465 	}
1466 }
1467 
1468 /* Caller must hold bdev->internal.mutex. */
1469 static void
1470 _spdk_bdev_enable_qos(struct spdk_bdev *bdev, struct spdk_bdev_channel *ch)
1471 {
1472 	struct spdk_bdev_qos *qos = bdev->internal.qos;
1473 
1474 	/* Rate limiting on this bdev enabled */
1475 	if (qos) {
1476 		if (qos->ch == NULL) {
1477 			struct spdk_io_channel *io_ch;
1478 
1479 			SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Selecting channel %p as QoS channel for bdev %s on thread %p\n", ch,
1480 				      bdev->name, spdk_get_thread());
1481 
1482 			/* No qos channel has been selected, so set one up */
1483 
1484 			/* Take another reference to ch */
1485 			io_ch = spdk_get_io_channel(__bdev_to_io_dev(bdev));
1486 			qos->ch = ch;
1487 
1488 			qos->thread = spdk_io_channel_get_thread(io_ch);
1489 
1490 			TAILQ_INIT(&qos->queued);
1491 			spdk_bdev_qos_update_max_quota_per_timeslice(qos);
1492 			qos->io_remaining_this_timeslice = qos->max_ios_per_timeslice;
1493 			qos->byte_remaining_this_timeslice = qos->max_byte_per_timeslice;
1494 			qos->timeslice_size =
1495 				SPDK_BDEV_QOS_TIMESLICE_IN_USEC * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC;
1496 			qos->last_timeslice = spdk_get_ticks();
1497 			qos->poller = spdk_poller_register(spdk_bdev_channel_poll_qos,
1498 							   qos,
1499 							   SPDK_BDEV_QOS_TIMESLICE_IN_USEC);
1500 		}
1501 
1502 		ch->flags |= BDEV_CH_QOS_ENABLED;
1503 	}
1504 }
1505 
1506 static int
1507 spdk_bdev_channel_create(void *io_device, void *ctx_buf)
1508 {
1509 	struct spdk_bdev		*bdev = __bdev_from_io_dev(io_device);
1510 	struct spdk_bdev_channel	*ch = ctx_buf;
1511 	struct spdk_io_channel		*mgmt_io_ch;
1512 	struct spdk_bdev_mgmt_channel	*mgmt_ch;
1513 	struct spdk_bdev_shared_resource *shared_resource;
1514 
1515 	ch->bdev = bdev;
1516 	ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt);
1517 	if (!ch->channel) {
1518 		return -1;
1519 	}
1520 
1521 	mgmt_io_ch = spdk_get_io_channel(&g_bdev_mgr);
1522 	if (!mgmt_io_ch) {
1523 		return -1;
1524 	}
1525 
1526 	mgmt_ch = spdk_io_channel_get_ctx(mgmt_io_ch);
1527 	TAILQ_FOREACH(shared_resource, &mgmt_ch->shared_resources, link) {
1528 		if (shared_resource->shared_ch == ch->channel) {
1529 			spdk_put_io_channel(mgmt_io_ch);
1530 			shared_resource->ref++;
1531 			break;
1532 		}
1533 	}
1534 
1535 	if (shared_resource == NULL) {
1536 		shared_resource = calloc(1, sizeof(*shared_resource));
1537 		if (shared_resource == NULL) {
1538 			spdk_put_io_channel(mgmt_io_ch);
1539 			return -1;
1540 		}
1541 
1542 		shared_resource->mgmt_ch = mgmt_ch;
1543 		shared_resource->io_outstanding = 0;
1544 		TAILQ_INIT(&shared_resource->nomem_io);
1545 		shared_resource->nomem_threshold = 0;
1546 		shared_resource->shared_ch = ch->channel;
1547 		shared_resource->ref = 1;
1548 		TAILQ_INSERT_TAIL(&mgmt_ch->shared_resources, shared_resource, link);
1549 	}
1550 
1551 	memset(&ch->stat, 0, sizeof(ch->stat));
1552 	ch->stat.ticks_rate = spdk_get_ticks_hz();
1553 	ch->io_outstanding = 0;
1554 	TAILQ_INIT(&ch->queued_resets);
1555 	ch->flags = 0;
1556 	ch->shared_resource = shared_resource;
1557 
1558 #ifdef SPDK_CONFIG_VTUNE
1559 	{
1560 		char *name;
1561 		__itt_init_ittlib(NULL, 0);
1562 		name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch);
1563 		if (!name) {
1564 			_spdk_bdev_channel_destroy_resource(ch);
1565 			return -1;
1566 		}
1567 		ch->handle = __itt_string_handle_create(name);
1568 		free(name);
1569 		ch->start_tsc = spdk_get_ticks();
1570 		ch->interval_tsc = spdk_get_ticks_hz() / 100;
1571 		memset(&ch->prev_stat, 0, sizeof(ch->prev_stat));
1572 	}
1573 #endif
1574 
1575 	pthread_mutex_lock(&bdev->internal.mutex);
1576 	_spdk_bdev_enable_qos(bdev, ch);
1577 	pthread_mutex_unlock(&bdev->internal.mutex);
1578 
1579 	return 0;
1580 }
1581 
1582 /*
1583  * Abort I/O that are waiting on a data buffer.  These types of I/O are
1584  *  linked using the spdk_bdev_io internal.buf_link TAILQ_ENTRY.
1585  */
1586 static void
1587 _spdk_bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch)
1588 {
1589 	bdev_io_stailq_t tmp;
1590 	struct spdk_bdev_io *bdev_io;
1591 
1592 	STAILQ_INIT(&tmp);
1593 
1594 	while (!STAILQ_EMPTY(queue)) {
1595 		bdev_io = STAILQ_FIRST(queue);
1596 		STAILQ_REMOVE_HEAD(queue, internal.buf_link);
1597 		if (bdev_io->internal.ch == ch) {
1598 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1599 		} else {
1600 			STAILQ_INSERT_TAIL(&tmp, bdev_io, internal.buf_link);
1601 		}
1602 	}
1603 
1604 	STAILQ_SWAP(&tmp, queue, spdk_bdev_io);
1605 }
1606 
1607 /*
1608  * Abort I/O that are queued waiting for submission.  These types of I/O are
1609  *  linked using the spdk_bdev_io link TAILQ_ENTRY.
1610  */
1611 static void
1612 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch)
1613 {
1614 	struct spdk_bdev_io *bdev_io, *tmp;
1615 
1616 	TAILQ_FOREACH_SAFE(bdev_io, queue, internal.link, tmp) {
1617 		if (bdev_io->internal.ch == ch) {
1618 			TAILQ_REMOVE(queue, bdev_io, internal.link);
1619 			/*
1620 			 * spdk_bdev_io_complete() assumes that the completed I/O had
1621 			 *  been submitted to the bdev module.  Since in this case it
1622 			 *  hadn't, bump io_outstanding to account for the decrement
1623 			 *  that spdk_bdev_io_complete() will do.
1624 			 */
1625 			if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) {
1626 				ch->io_outstanding++;
1627 				ch->shared_resource->io_outstanding++;
1628 			}
1629 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1630 		}
1631 	}
1632 }
1633 
1634 static void
1635 spdk_bdev_qos_channel_destroy(void *cb_arg)
1636 {
1637 	struct spdk_bdev_qos *qos = cb_arg;
1638 
1639 	spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch));
1640 	spdk_poller_unregister(&qos->poller);
1641 
1642 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Free QoS %p.\n", qos);
1643 
1644 	free(qos);
1645 }
1646 
1647 static int
1648 spdk_bdev_qos_destroy(struct spdk_bdev *bdev)
1649 {
1650 	/*
1651 	 * Cleanly shutting down the QoS poller is tricky, because
1652 	 * during the asynchronous operation the user could open
1653 	 * a new descriptor and create a new channel, spawning
1654 	 * a new QoS poller.
1655 	 *
1656 	 * The strategy is to create a new QoS structure here and swap it
1657 	 * in. The shutdown path then continues to refer to the old one
1658 	 * until it completes and then releases it.
1659 	 */
1660 	struct spdk_bdev_qos *new_qos, *old_qos;
1661 
1662 	old_qos = bdev->internal.qos;
1663 
1664 	new_qos = calloc(1, sizeof(*new_qos));
1665 	if (!new_qos) {
1666 		SPDK_ERRLOG("Unable to allocate memory to shut down QoS.\n");
1667 		return -ENOMEM;
1668 	}
1669 
1670 	/* Copy the old QoS data into the newly allocated structure */
1671 	memcpy(new_qos, old_qos, sizeof(*new_qos));
1672 
1673 	/* Zero out the key parts of the QoS structure */
1674 	new_qos->ch = NULL;
1675 	new_qos->thread = NULL;
1676 	new_qos->max_ios_per_timeslice = 0;
1677 	new_qos->max_byte_per_timeslice = 0;
1678 	new_qos->io_remaining_this_timeslice = 0;
1679 	new_qos->byte_remaining_this_timeslice = 0;
1680 	new_qos->poller = NULL;
1681 	TAILQ_INIT(&new_qos->queued);
1682 
1683 	bdev->internal.qos = new_qos;
1684 
1685 	if (old_qos->thread == NULL) {
1686 		free(old_qos);
1687 	} else {
1688 		spdk_thread_send_msg(old_qos->thread, spdk_bdev_qos_channel_destroy,
1689 				     old_qos);
1690 	}
1691 
1692 	/* It is safe to continue with destroying the bdev even though the QoS channel hasn't
1693 	 * been destroyed yet. The destruction path will end up waiting for the final
1694 	 * channel to be put before it releases resources. */
1695 
1696 	return 0;
1697 }
1698 
1699 static void
1700 _spdk_bdev_io_stat_add(struct spdk_bdev_io_stat *total, struct spdk_bdev_io_stat *add)
1701 {
1702 	total->bytes_read += add->bytes_read;
1703 	total->num_read_ops += add->num_read_ops;
1704 	total->bytes_written += add->bytes_written;
1705 	total->num_write_ops += add->num_write_ops;
1706 	total->read_latency_ticks += add->read_latency_ticks;
1707 	total->write_latency_ticks += add->write_latency_ticks;
1708 }
1709 
1710 static void
1711 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf)
1712 {
1713 	struct spdk_bdev_channel	*ch = ctx_buf;
1714 	struct spdk_bdev_mgmt_channel	*mgmt_ch;
1715 	struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource;
1716 
1717 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Destroying channel %p for bdev %s on thread %p\n", ch, ch->bdev->name,
1718 		      spdk_get_thread());
1719 
1720 	/* This channel is going away, so add its statistics into the bdev so that they don't get lost. */
1721 	pthread_mutex_lock(&ch->bdev->internal.mutex);
1722 	_spdk_bdev_io_stat_add(&ch->bdev->internal.stat, &ch->stat);
1723 	pthread_mutex_unlock(&ch->bdev->internal.mutex);
1724 
1725 	mgmt_ch = shared_resource->mgmt_ch;
1726 
1727 	_spdk_bdev_abort_queued_io(&ch->queued_resets, ch);
1728 	_spdk_bdev_abort_queued_io(&shared_resource->nomem_io, ch);
1729 	_spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_small, ch);
1730 	_spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_large, ch);
1731 
1732 	_spdk_bdev_channel_destroy_resource(ch);
1733 }
1734 
1735 int
1736 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias)
1737 {
1738 	struct spdk_bdev_alias *tmp;
1739 
1740 	if (alias == NULL) {
1741 		SPDK_ERRLOG("Empty alias passed\n");
1742 		return -EINVAL;
1743 	}
1744 
1745 	if (spdk_bdev_get_by_name(alias)) {
1746 		SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias);
1747 		return -EEXIST;
1748 	}
1749 
1750 	tmp = calloc(1, sizeof(*tmp));
1751 	if (tmp == NULL) {
1752 		SPDK_ERRLOG("Unable to allocate alias\n");
1753 		return -ENOMEM;
1754 	}
1755 
1756 	tmp->alias = strdup(alias);
1757 	if (tmp->alias == NULL) {
1758 		free(tmp);
1759 		SPDK_ERRLOG("Unable to allocate alias\n");
1760 		return -ENOMEM;
1761 	}
1762 
1763 	TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq);
1764 
1765 	return 0;
1766 }
1767 
1768 int
1769 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias)
1770 {
1771 	struct spdk_bdev_alias *tmp;
1772 
1773 	TAILQ_FOREACH(tmp, &bdev->aliases, tailq) {
1774 		if (strcmp(alias, tmp->alias) == 0) {
1775 			TAILQ_REMOVE(&bdev->aliases, tmp, tailq);
1776 			free(tmp->alias);
1777 			free(tmp);
1778 			return 0;
1779 		}
1780 	}
1781 
1782 	SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias);
1783 
1784 	return -ENOENT;
1785 }
1786 
1787 void
1788 spdk_bdev_alias_del_all(struct spdk_bdev *bdev)
1789 {
1790 	struct spdk_bdev_alias *p, *tmp;
1791 
1792 	TAILQ_FOREACH_SAFE(p, &bdev->aliases, tailq, tmp) {
1793 		TAILQ_REMOVE(&bdev->aliases, p, tailq);
1794 		free(p->alias);
1795 		free(p);
1796 	}
1797 }
1798 
1799 struct spdk_io_channel *
1800 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc)
1801 {
1802 	return spdk_get_io_channel(__bdev_to_io_dev(desc->bdev));
1803 }
1804 
1805 const char *
1806 spdk_bdev_get_name(const struct spdk_bdev *bdev)
1807 {
1808 	return bdev->name;
1809 }
1810 
1811 const char *
1812 spdk_bdev_get_product_name(const struct spdk_bdev *bdev)
1813 {
1814 	return bdev->product_name;
1815 }
1816 
1817 const struct spdk_bdev_aliases_list *
1818 spdk_bdev_get_aliases(const struct spdk_bdev *bdev)
1819 {
1820 	return &bdev->aliases;
1821 }
1822 
1823 uint32_t
1824 spdk_bdev_get_block_size(const struct spdk_bdev *bdev)
1825 {
1826 	return bdev->blocklen;
1827 }
1828 
1829 uint64_t
1830 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev)
1831 {
1832 	return bdev->blockcnt;
1833 }
1834 
1835 uint64_t
1836 spdk_bdev_get_qos_ios_per_sec(struct spdk_bdev *bdev)
1837 {
1838 	uint64_t iops_rate_limit = 0;
1839 
1840 	pthread_mutex_lock(&bdev->internal.mutex);
1841 	if (bdev->internal.qos) {
1842 		iops_rate_limit = bdev->internal.qos->iops_rate_limit;
1843 	}
1844 	pthread_mutex_unlock(&bdev->internal.mutex);
1845 
1846 	return iops_rate_limit;
1847 }
1848 
1849 size_t
1850 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev)
1851 {
1852 	/* TODO: push this logic down to the bdev modules */
1853 	if (bdev->need_aligned_buffer) {
1854 		return bdev->blocklen;
1855 	}
1856 
1857 	return 1;
1858 }
1859 
1860 uint32_t
1861 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev)
1862 {
1863 	return bdev->optimal_io_boundary;
1864 }
1865 
1866 bool
1867 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev)
1868 {
1869 	return bdev->write_cache;
1870 }
1871 
1872 const struct spdk_uuid *
1873 spdk_bdev_get_uuid(const struct spdk_bdev *bdev)
1874 {
1875 	return &bdev->uuid;
1876 }
1877 
1878 uint64_t
1879 spdk_bdev_get_qd(const struct spdk_bdev *bdev)
1880 {
1881 	return bdev->internal.measured_queue_depth;
1882 }
1883 
1884 uint64_t
1885 spdk_bdev_get_qd_sampling_period(const struct spdk_bdev *bdev)
1886 {
1887 	return bdev->internal.period;
1888 }
1889 
1890 uint64_t
1891 spdk_bdev_get_weighted_io_time(const struct spdk_bdev *bdev)
1892 {
1893 	return bdev->internal.weighted_io_time;
1894 }
1895 
1896 uint64_t
1897 spdk_bdev_get_io_time(const struct spdk_bdev *bdev)
1898 {
1899 	return bdev->internal.io_time;
1900 }
1901 
1902 static void
1903 _calculate_measured_qd_cpl(struct spdk_io_channel_iter *i, int status)
1904 {
1905 	struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i);
1906 
1907 	bdev->internal.measured_queue_depth = bdev->internal.temporary_queue_depth;
1908 
1909 	if (bdev->internal.measured_queue_depth) {
1910 		bdev->internal.io_time += bdev->internal.period;
1911 		bdev->internal.weighted_io_time += bdev->internal.period * bdev->internal.measured_queue_depth;
1912 	}
1913 }
1914 
1915 static void
1916 _calculate_measured_qd(struct spdk_io_channel_iter *i)
1917 {
1918 	struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i);
1919 	struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i);
1920 	struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(io_ch);
1921 
1922 	bdev->internal.temporary_queue_depth += ch->io_outstanding;
1923 	spdk_for_each_channel_continue(i, 0);
1924 }
1925 
1926 static int
1927 spdk_bdev_calculate_measured_queue_depth(void *ctx)
1928 {
1929 	struct spdk_bdev *bdev = ctx;
1930 	bdev->internal.temporary_queue_depth = 0;
1931 	spdk_for_each_channel(__bdev_to_io_dev(bdev), _calculate_measured_qd, bdev,
1932 			      _calculate_measured_qd_cpl);
1933 	return 0;
1934 }
1935 
1936 void
1937 spdk_bdev_set_qd_sampling_period(struct spdk_bdev *bdev, uint64_t period)
1938 {
1939 	bdev->internal.period = period;
1940 
1941 	if (bdev->internal.qd_poller != NULL) {
1942 		spdk_poller_unregister(&bdev->internal.qd_poller);
1943 		bdev->internal.measured_queue_depth = UINT64_MAX;
1944 	}
1945 
1946 	if (period != 0) {
1947 		bdev->internal.qd_poller = spdk_poller_register(spdk_bdev_calculate_measured_queue_depth, bdev,
1948 					   period);
1949 	}
1950 }
1951 
1952 int
1953 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size)
1954 {
1955 	int ret;
1956 
1957 	pthread_mutex_lock(&bdev->internal.mutex);
1958 
1959 	/* bdev has open descriptors */
1960 	if (!TAILQ_EMPTY(&bdev->internal.open_descs) &&
1961 	    bdev->blockcnt > size) {
1962 		ret = -EBUSY;
1963 	} else {
1964 		bdev->blockcnt = size;
1965 		ret = 0;
1966 	}
1967 
1968 	pthread_mutex_unlock(&bdev->internal.mutex);
1969 
1970 	return ret;
1971 }
1972 
1973 /*
1974  * Convert I/O offset and length from bytes to blocks.
1975  *
1976  * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size.
1977  */
1978 static uint64_t
1979 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks,
1980 			  uint64_t num_bytes, uint64_t *num_blocks)
1981 {
1982 	uint32_t block_size = bdev->blocklen;
1983 
1984 	*offset_blocks = offset_bytes / block_size;
1985 	*num_blocks = num_bytes / block_size;
1986 
1987 	return (offset_bytes % block_size) | (num_bytes % block_size);
1988 }
1989 
1990 static bool
1991 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks)
1992 {
1993 	/* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there
1994 	 * has been an overflow and hence the offset has been wrapped around */
1995 	if (offset_blocks + num_blocks < offset_blocks) {
1996 		return false;
1997 	}
1998 
1999 	/* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */
2000 	if (offset_blocks + num_blocks > bdev->blockcnt) {
2001 		return false;
2002 	}
2003 
2004 	return true;
2005 }
2006 
2007 int
2008 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2009 	       void *buf, uint64_t offset, uint64_t nbytes,
2010 	       spdk_bdev_io_completion_cb cb, void *cb_arg)
2011 {
2012 	uint64_t offset_blocks, num_blocks;
2013 
2014 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
2015 		return -EINVAL;
2016 	}
2017 
2018 	return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
2019 }
2020 
2021 int
2022 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2023 		      void *buf, uint64_t offset_blocks, uint64_t num_blocks,
2024 		      spdk_bdev_io_completion_cb cb, void *cb_arg)
2025 {
2026 	struct spdk_bdev *bdev = desc->bdev;
2027 	struct spdk_bdev_io *bdev_io;
2028 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2029 
2030 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2031 		return -EINVAL;
2032 	}
2033 
2034 	bdev_io = spdk_bdev_get_io(channel);
2035 	if (!bdev_io) {
2036 		return -ENOMEM;
2037 	}
2038 
2039 	bdev_io->internal.ch = channel;
2040 	bdev_io->internal.desc = desc;
2041 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
2042 	bdev_io->u.bdev.iovs = &bdev_io->iov;
2043 	bdev_io->u.bdev.iovs[0].iov_base = buf;
2044 	bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen;
2045 	bdev_io->u.bdev.iovcnt = 1;
2046 	bdev_io->u.bdev.num_blocks = num_blocks;
2047 	bdev_io->u.bdev.offset_blocks = offset_blocks;
2048 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2049 
2050 	spdk_bdev_io_submit(bdev_io);
2051 	return 0;
2052 }
2053 
2054 int
2055 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2056 		struct iovec *iov, int iovcnt,
2057 		uint64_t offset, uint64_t nbytes,
2058 		spdk_bdev_io_completion_cb cb, void *cb_arg)
2059 {
2060 	uint64_t offset_blocks, num_blocks;
2061 
2062 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
2063 		return -EINVAL;
2064 	}
2065 
2066 	return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
2067 }
2068 
2069 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2070 			   struct iovec *iov, int iovcnt,
2071 			   uint64_t offset_blocks, uint64_t num_blocks,
2072 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
2073 {
2074 	struct spdk_bdev *bdev = desc->bdev;
2075 	struct spdk_bdev_io *bdev_io;
2076 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2077 
2078 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2079 		return -EINVAL;
2080 	}
2081 
2082 	bdev_io = spdk_bdev_get_io(channel);
2083 	if (!bdev_io) {
2084 		return -ENOMEM;
2085 	}
2086 
2087 	bdev_io->internal.ch = channel;
2088 	bdev_io->internal.desc = desc;
2089 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
2090 	bdev_io->u.bdev.iovs = iov;
2091 	bdev_io->u.bdev.iovcnt = iovcnt;
2092 	bdev_io->u.bdev.num_blocks = num_blocks;
2093 	bdev_io->u.bdev.offset_blocks = offset_blocks;
2094 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2095 
2096 	spdk_bdev_io_submit(bdev_io);
2097 	return 0;
2098 }
2099 
2100 int
2101 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2102 		void *buf, uint64_t offset, uint64_t nbytes,
2103 		spdk_bdev_io_completion_cb cb, void *cb_arg)
2104 {
2105 	uint64_t offset_blocks, num_blocks;
2106 
2107 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
2108 		return -EINVAL;
2109 	}
2110 
2111 	return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
2112 }
2113 
2114 int
2115 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2116 		       void *buf, uint64_t offset_blocks, uint64_t num_blocks,
2117 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
2118 {
2119 	struct spdk_bdev *bdev = desc->bdev;
2120 	struct spdk_bdev_io *bdev_io;
2121 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2122 
2123 	if (!desc->write) {
2124 		return -EBADF;
2125 	}
2126 
2127 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2128 		return -EINVAL;
2129 	}
2130 
2131 	bdev_io = spdk_bdev_get_io(channel);
2132 	if (!bdev_io) {
2133 		return -ENOMEM;
2134 	}
2135 
2136 	bdev_io->internal.ch = channel;
2137 	bdev_io->internal.desc = desc;
2138 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
2139 	bdev_io->u.bdev.iovs = &bdev_io->iov;
2140 	bdev_io->u.bdev.iovs[0].iov_base = buf;
2141 	bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen;
2142 	bdev_io->u.bdev.iovcnt = 1;
2143 	bdev_io->u.bdev.num_blocks = num_blocks;
2144 	bdev_io->u.bdev.offset_blocks = offset_blocks;
2145 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2146 
2147 	spdk_bdev_io_submit(bdev_io);
2148 	return 0;
2149 }
2150 
2151 int
2152 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2153 		 struct iovec *iov, int iovcnt,
2154 		 uint64_t offset, uint64_t len,
2155 		 spdk_bdev_io_completion_cb cb, void *cb_arg)
2156 {
2157 	uint64_t offset_blocks, num_blocks;
2158 
2159 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
2160 		return -EINVAL;
2161 	}
2162 
2163 	return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
2164 }
2165 
2166 int
2167 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2168 			struct iovec *iov, int iovcnt,
2169 			uint64_t offset_blocks, uint64_t num_blocks,
2170 			spdk_bdev_io_completion_cb cb, void *cb_arg)
2171 {
2172 	struct spdk_bdev *bdev = desc->bdev;
2173 	struct spdk_bdev_io *bdev_io;
2174 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2175 
2176 	if (!desc->write) {
2177 		return -EBADF;
2178 	}
2179 
2180 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2181 		return -EINVAL;
2182 	}
2183 
2184 	bdev_io = spdk_bdev_get_io(channel);
2185 	if (!bdev_io) {
2186 		return -ENOMEM;
2187 	}
2188 
2189 	bdev_io->internal.ch = channel;
2190 	bdev_io->internal.desc = desc;
2191 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
2192 	bdev_io->u.bdev.iovs = iov;
2193 	bdev_io->u.bdev.iovcnt = iovcnt;
2194 	bdev_io->u.bdev.num_blocks = num_blocks;
2195 	bdev_io->u.bdev.offset_blocks = offset_blocks;
2196 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2197 
2198 	spdk_bdev_io_submit(bdev_io);
2199 	return 0;
2200 }
2201 
2202 int
2203 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2204 		       uint64_t offset, uint64_t len,
2205 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
2206 {
2207 	uint64_t offset_blocks, num_blocks;
2208 
2209 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
2210 		return -EINVAL;
2211 	}
2212 
2213 	return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
2214 }
2215 
2216 int
2217 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2218 			      uint64_t offset_blocks, uint64_t num_blocks,
2219 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
2220 {
2221 	struct spdk_bdev *bdev = desc->bdev;
2222 	struct spdk_bdev_io *bdev_io;
2223 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2224 
2225 	if (!desc->write) {
2226 		return -EBADF;
2227 	}
2228 
2229 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2230 		return -EINVAL;
2231 	}
2232 
2233 	bdev_io = spdk_bdev_get_io(channel);
2234 
2235 	if (!bdev_io) {
2236 		return -ENOMEM;
2237 	}
2238 
2239 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES;
2240 	bdev_io->internal.ch = channel;
2241 	bdev_io->internal.desc = desc;
2242 	bdev_io->u.bdev.offset_blocks = offset_blocks;
2243 	bdev_io->u.bdev.num_blocks = num_blocks;
2244 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2245 
2246 	if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
2247 		spdk_bdev_io_submit(bdev_io);
2248 		return 0;
2249 	} else if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)) {
2250 		assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE);
2251 		bdev_io->u.bdev.split_remaining_num_blocks = num_blocks;
2252 		bdev_io->u.bdev.split_current_offset_blocks = offset_blocks;
2253 		_spdk_bdev_write_zero_buffer_next(bdev_io);
2254 		return 0;
2255 	} else {
2256 		spdk_bdev_free_io(bdev_io);
2257 		return -ENOTSUP;
2258 	}
2259 }
2260 
2261 int
2262 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2263 		uint64_t offset, uint64_t nbytes,
2264 		spdk_bdev_io_completion_cb cb, void *cb_arg)
2265 {
2266 	uint64_t offset_blocks, num_blocks;
2267 
2268 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
2269 		return -EINVAL;
2270 	}
2271 
2272 	return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
2273 }
2274 
2275 int
2276 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2277 		       uint64_t offset_blocks, uint64_t num_blocks,
2278 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
2279 {
2280 	struct spdk_bdev *bdev = desc->bdev;
2281 	struct spdk_bdev_io *bdev_io;
2282 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2283 
2284 	if (!desc->write) {
2285 		return -EBADF;
2286 	}
2287 
2288 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2289 		return -EINVAL;
2290 	}
2291 
2292 	if (num_blocks == 0) {
2293 		SPDK_ERRLOG("Can't unmap 0 bytes\n");
2294 		return -EINVAL;
2295 	}
2296 
2297 	bdev_io = spdk_bdev_get_io(channel);
2298 	if (!bdev_io) {
2299 		return -ENOMEM;
2300 	}
2301 
2302 	bdev_io->internal.ch = channel;
2303 	bdev_io->internal.desc = desc;
2304 	bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP;
2305 
2306 	bdev_io->u.bdev.iovs = &bdev_io->iov;
2307 	bdev_io->u.bdev.iovs[0].iov_base = NULL;
2308 	bdev_io->u.bdev.iovs[0].iov_len = 0;
2309 	bdev_io->u.bdev.iovcnt = 1;
2310 
2311 	bdev_io->u.bdev.offset_blocks = offset_blocks;
2312 	bdev_io->u.bdev.num_blocks = num_blocks;
2313 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2314 
2315 	spdk_bdev_io_submit(bdev_io);
2316 	return 0;
2317 }
2318 
2319 int
2320 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2321 		uint64_t offset, uint64_t length,
2322 		spdk_bdev_io_completion_cb cb, void *cb_arg)
2323 {
2324 	uint64_t offset_blocks, num_blocks;
2325 
2326 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) {
2327 		return -EINVAL;
2328 	}
2329 
2330 	return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
2331 }
2332 
2333 int
2334 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2335 		       uint64_t offset_blocks, uint64_t num_blocks,
2336 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
2337 {
2338 	struct spdk_bdev *bdev = desc->bdev;
2339 	struct spdk_bdev_io *bdev_io;
2340 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2341 
2342 	if (!desc->write) {
2343 		return -EBADF;
2344 	}
2345 
2346 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2347 		return -EINVAL;
2348 	}
2349 
2350 	bdev_io = spdk_bdev_get_io(channel);
2351 	if (!bdev_io) {
2352 		return -ENOMEM;
2353 	}
2354 
2355 	bdev_io->internal.ch = channel;
2356 	bdev_io->internal.desc = desc;
2357 	bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH;
2358 	bdev_io->u.bdev.iovs = NULL;
2359 	bdev_io->u.bdev.iovcnt = 0;
2360 	bdev_io->u.bdev.offset_blocks = offset_blocks;
2361 	bdev_io->u.bdev.num_blocks = num_blocks;
2362 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2363 
2364 	spdk_bdev_io_submit(bdev_io);
2365 	return 0;
2366 }
2367 
2368 static void
2369 _spdk_bdev_reset_dev(struct spdk_io_channel_iter *i, int status)
2370 {
2371 	struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i);
2372 	struct spdk_bdev_io *bdev_io;
2373 
2374 	bdev_io = TAILQ_FIRST(&ch->queued_resets);
2375 	TAILQ_REMOVE(&ch->queued_resets, bdev_io, internal.link);
2376 	spdk_bdev_io_submit_reset(bdev_io);
2377 }
2378 
2379 static void
2380 _spdk_bdev_reset_freeze_channel(struct spdk_io_channel_iter *i)
2381 {
2382 	struct spdk_io_channel		*ch;
2383 	struct spdk_bdev_channel	*channel;
2384 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
2385 	struct spdk_bdev_shared_resource *shared_resource;
2386 	bdev_io_tailq_t			tmp_queued;
2387 
2388 	TAILQ_INIT(&tmp_queued);
2389 
2390 	ch = spdk_io_channel_iter_get_channel(i);
2391 	channel = spdk_io_channel_get_ctx(ch);
2392 	shared_resource = channel->shared_resource;
2393 	mgmt_channel = shared_resource->mgmt_ch;
2394 
2395 	channel->flags |= BDEV_CH_RESET_IN_PROGRESS;
2396 
2397 	if ((channel->flags & BDEV_CH_QOS_ENABLED) != 0) {
2398 		/* The QoS object is always valid and readable while
2399 		 * the channel flag is set, so the lock here should not
2400 		 * be necessary. We're not in the fast path though, so
2401 		 * just take it anyway. */
2402 		pthread_mutex_lock(&channel->bdev->internal.mutex);
2403 		if (channel->bdev->internal.qos->ch == channel) {
2404 			TAILQ_SWAP(&channel->bdev->internal.qos->queued, &tmp_queued, spdk_bdev_io, internal.link);
2405 		}
2406 		pthread_mutex_unlock(&channel->bdev->internal.mutex);
2407 	}
2408 
2409 	_spdk_bdev_abort_queued_io(&shared_resource->nomem_io, channel);
2410 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel);
2411 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel);
2412 	_spdk_bdev_abort_queued_io(&tmp_queued, channel);
2413 
2414 	spdk_for_each_channel_continue(i, 0);
2415 }
2416 
2417 static void
2418 _spdk_bdev_start_reset(void *ctx)
2419 {
2420 	struct spdk_bdev_channel *ch = ctx;
2421 
2422 	spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), _spdk_bdev_reset_freeze_channel,
2423 			      ch, _spdk_bdev_reset_dev);
2424 }
2425 
2426 static void
2427 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch)
2428 {
2429 	struct spdk_bdev *bdev = ch->bdev;
2430 
2431 	assert(!TAILQ_EMPTY(&ch->queued_resets));
2432 
2433 	pthread_mutex_lock(&bdev->internal.mutex);
2434 	if (bdev->internal.reset_in_progress == NULL) {
2435 		bdev->internal.reset_in_progress = TAILQ_FIRST(&ch->queued_resets);
2436 		/*
2437 		 * Take a channel reference for the target bdev for the life of this
2438 		 *  reset.  This guards against the channel getting destroyed while
2439 		 *  spdk_for_each_channel() calls related to this reset IO are in
2440 		 *  progress.  We will release the reference when this reset is
2441 		 *  completed.
2442 		 */
2443 		bdev->internal.reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev));
2444 		_spdk_bdev_start_reset(ch);
2445 	}
2446 	pthread_mutex_unlock(&bdev->internal.mutex);
2447 }
2448 
2449 int
2450 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2451 		spdk_bdev_io_completion_cb cb, void *cb_arg)
2452 {
2453 	struct spdk_bdev *bdev = desc->bdev;
2454 	struct spdk_bdev_io *bdev_io;
2455 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2456 
2457 	bdev_io = spdk_bdev_get_io(channel);
2458 	if (!bdev_io) {
2459 		return -ENOMEM;
2460 	}
2461 
2462 	bdev_io->internal.ch = channel;
2463 	bdev_io->internal.desc = desc;
2464 	bdev_io->type = SPDK_BDEV_IO_TYPE_RESET;
2465 	bdev_io->u.reset.ch_ref = NULL;
2466 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2467 
2468 	pthread_mutex_lock(&bdev->internal.mutex);
2469 	TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, internal.link);
2470 	pthread_mutex_unlock(&bdev->internal.mutex);
2471 
2472 	_spdk_bdev_channel_start_reset(channel);
2473 
2474 	return 0;
2475 }
2476 
2477 void
2478 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
2479 		      struct spdk_bdev_io_stat *stat)
2480 {
2481 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2482 
2483 	*stat = channel->stat;
2484 }
2485 
2486 static void
2487 _spdk_bdev_get_device_stat_done(struct spdk_io_channel_iter *i, int status)
2488 {
2489 	void *io_device = spdk_io_channel_iter_get_io_device(i);
2490 	struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i);
2491 
2492 	bdev_iostat_ctx->cb(__bdev_from_io_dev(io_device), bdev_iostat_ctx->stat,
2493 			    bdev_iostat_ctx->cb_arg, 0);
2494 	free(bdev_iostat_ctx);
2495 }
2496 
2497 static void
2498 _spdk_bdev_get_each_channel_stat(struct spdk_io_channel_iter *i)
2499 {
2500 	struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i);
2501 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2502 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2503 
2504 	_spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &channel->stat);
2505 	spdk_for_each_channel_continue(i, 0);
2506 }
2507 
2508 void
2509 spdk_bdev_get_device_stat(struct spdk_bdev *bdev, struct spdk_bdev_io_stat *stat,
2510 			  spdk_bdev_get_device_stat_cb cb, void *cb_arg)
2511 {
2512 	struct spdk_bdev_iostat_ctx *bdev_iostat_ctx;
2513 
2514 	assert(bdev != NULL);
2515 	assert(stat != NULL);
2516 	assert(cb != NULL);
2517 
2518 	bdev_iostat_ctx = calloc(1, sizeof(struct spdk_bdev_iostat_ctx));
2519 	if (bdev_iostat_ctx == NULL) {
2520 		SPDK_ERRLOG("Unable to allocate memory for spdk_bdev_iostat_ctx\n");
2521 		cb(bdev, stat, cb_arg, -ENOMEM);
2522 		return;
2523 	}
2524 
2525 	bdev_iostat_ctx->stat = stat;
2526 	bdev_iostat_ctx->cb = cb;
2527 	bdev_iostat_ctx->cb_arg = cb_arg;
2528 
2529 	/* Start with the statistics from previously deleted channels. */
2530 	pthread_mutex_lock(&bdev->internal.mutex);
2531 	_spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &bdev->internal.stat);
2532 	pthread_mutex_unlock(&bdev->internal.mutex);
2533 
2534 	/* Then iterate and add the statistics from each existing channel. */
2535 	spdk_for_each_channel(__bdev_to_io_dev(bdev),
2536 			      _spdk_bdev_get_each_channel_stat,
2537 			      bdev_iostat_ctx,
2538 			      _spdk_bdev_get_device_stat_done);
2539 }
2540 
2541 int
2542 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2543 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
2544 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
2545 {
2546 	struct spdk_bdev *bdev = desc->bdev;
2547 	struct spdk_bdev_io *bdev_io;
2548 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2549 
2550 	if (!desc->write) {
2551 		return -EBADF;
2552 	}
2553 
2554 	bdev_io = spdk_bdev_get_io(channel);
2555 	if (!bdev_io) {
2556 		return -ENOMEM;
2557 	}
2558 
2559 	bdev_io->internal.ch = channel;
2560 	bdev_io->internal.desc = desc;
2561 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN;
2562 	bdev_io->u.nvme_passthru.cmd = *cmd;
2563 	bdev_io->u.nvme_passthru.buf = buf;
2564 	bdev_io->u.nvme_passthru.nbytes = nbytes;
2565 	bdev_io->u.nvme_passthru.md_buf = NULL;
2566 	bdev_io->u.nvme_passthru.md_len = 0;
2567 
2568 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2569 
2570 	spdk_bdev_io_submit(bdev_io);
2571 	return 0;
2572 }
2573 
2574 int
2575 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2576 			   const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
2577 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
2578 {
2579 	struct spdk_bdev *bdev = desc->bdev;
2580 	struct spdk_bdev_io *bdev_io;
2581 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2582 
2583 	if (!desc->write) {
2584 		/*
2585 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
2586 		 *  to easily determine if the command is a read or write, but for now just
2587 		 *  do not allow io_passthru with a read-only descriptor.
2588 		 */
2589 		return -EBADF;
2590 	}
2591 
2592 	bdev_io = spdk_bdev_get_io(channel);
2593 	if (!bdev_io) {
2594 		return -ENOMEM;
2595 	}
2596 
2597 	bdev_io->internal.ch = channel;
2598 	bdev_io->internal.desc = desc;
2599 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO;
2600 	bdev_io->u.nvme_passthru.cmd = *cmd;
2601 	bdev_io->u.nvme_passthru.buf = buf;
2602 	bdev_io->u.nvme_passthru.nbytes = nbytes;
2603 	bdev_io->u.nvme_passthru.md_buf = NULL;
2604 	bdev_io->u.nvme_passthru.md_len = 0;
2605 
2606 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2607 
2608 	spdk_bdev_io_submit(bdev_io);
2609 	return 0;
2610 }
2611 
2612 int
2613 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2614 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len,
2615 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
2616 {
2617 	struct spdk_bdev *bdev = desc->bdev;
2618 	struct spdk_bdev_io *bdev_io;
2619 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2620 
2621 	if (!desc->write) {
2622 		/*
2623 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
2624 		 *  to easily determine if the command is a read or write, but for now just
2625 		 *  do not allow io_passthru with a read-only descriptor.
2626 		 */
2627 		return -EBADF;
2628 	}
2629 
2630 	bdev_io = spdk_bdev_get_io(channel);
2631 	if (!bdev_io) {
2632 		return -ENOMEM;
2633 	}
2634 
2635 	bdev_io->internal.ch = channel;
2636 	bdev_io->internal.desc = desc;
2637 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD;
2638 	bdev_io->u.nvme_passthru.cmd = *cmd;
2639 	bdev_io->u.nvme_passthru.buf = buf;
2640 	bdev_io->u.nvme_passthru.nbytes = nbytes;
2641 	bdev_io->u.nvme_passthru.md_buf = md_buf;
2642 	bdev_io->u.nvme_passthru.md_len = md_len;
2643 
2644 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2645 
2646 	spdk_bdev_io_submit(bdev_io);
2647 	return 0;
2648 }
2649 
2650 int
2651 spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
2652 			struct spdk_bdev_io_wait_entry *entry)
2653 {
2654 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2655 	struct spdk_bdev_mgmt_channel *mgmt_ch = channel->shared_resource->mgmt_ch;
2656 
2657 	if (bdev != entry->bdev) {
2658 		SPDK_ERRLOG("bdevs do not match\n");
2659 		return -EINVAL;
2660 	}
2661 
2662 	if (mgmt_ch->per_thread_cache_count > 0) {
2663 		SPDK_ERRLOG("Cannot queue io_wait if spdk_bdev_io available in per-thread cache\n");
2664 		return -EINVAL;
2665 	}
2666 
2667 	TAILQ_INSERT_TAIL(&mgmt_ch->io_wait_queue, entry, link);
2668 	return 0;
2669 }
2670 
2671 static void
2672 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch)
2673 {
2674 	struct spdk_bdev *bdev = bdev_ch->bdev;
2675 	struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource;
2676 	struct spdk_bdev_io *bdev_io;
2677 
2678 	if (shared_resource->io_outstanding > shared_resource->nomem_threshold) {
2679 		/*
2680 		 * Allow some more I/O to complete before retrying the nomem_io queue.
2681 		 *  Some drivers (such as nvme) cannot immediately take a new I/O in
2682 		 *  the context of a completion, because the resources for the I/O are
2683 		 *  not released until control returns to the bdev poller.  Also, we
2684 		 *  may require several small I/O to complete before a larger I/O
2685 		 *  (that requires splitting) can be submitted.
2686 		 */
2687 		return;
2688 	}
2689 
2690 	while (!TAILQ_EMPTY(&shared_resource->nomem_io)) {
2691 		bdev_io = TAILQ_FIRST(&shared_resource->nomem_io);
2692 		TAILQ_REMOVE(&shared_resource->nomem_io, bdev_io, internal.link);
2693 		bdev_io->internal.ch->io_outstanding++;
2694 		shared_resource->io_outstanding++;
2695 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING;
2696 		bdev->fn_table->submit_request(bdev_io->internal.ch->channel, bdev_io);
2697 		if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) {
2698 			break;
2699 		}
2700 	}
2701 }
2702 
2703 static inline void
2704 _spdk_bdev_io_complete(void *ctx)
2705 {
2706 	struct spdk_bdev_io *bdev_io = ctx;
2707 	uint64_t tsc;
2708 
2709 	if (spdk_unlikely(bdev_io->internal.in_submit_request || bdev_io->internal.io_submit_ch)) {
2710 		/*
2711 		 * Send the completion to the thread that originally submitted the I/O,
2712 		 * which may not be the current thread in the case of QoS.
2713 		 */
2714 		if (bdev_io->internal.io_submit_ch) {
2715 			bdev_io->internal.ch = bdev_io->internal.io_submit_ch;
2716 			bdev_io->internal.io_submit_ch = NULL;
2717 		}
2718 
2719 		/*
2720 		 * Defer completion to avoid potential infinite recursion if the
2721 		 * user's completion callback issues a new I/O.
2722 		 */
2723 		spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel),
2724 				     _spdk_bdev_io_complete, bdev_io);
2725 		return;
2726 	}
2727 
2728 	tsc = spdk_get_ticks();
2729 	spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_DONE, 0, 0, (uintptr_t)bdev_io, 0);
2730 
2731 	if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) {
2732 		switch (bdev_io->type) {
2733 		case SPDK_BDEV_IO_TYPE_READ:
2734 			bdev_io->internal.ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
2735 			bdev_io->internal.ch->stat.num_read_ops++;
2736 			bdev_io->internal.ch->stat.read_latency_ticks += (tsc - bdev_io->internal.submit_tsc);
2737 			break;
2738 		case SPDK_BDEV_IO_TYPE_WRITE:
2739 			bdev_io->internal.ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
2740 			bdev_io->internal.ch->stat.num_write_ops++;
2741 			bdev_io->internal.ch->stat.write_latency_ticks += (tsc - bdev_io->internal.submit_tsc);
2742 			break;
2743 		default:
2744 			break;
2745 		}
2746 	}
2747 
2748 #ifdef SPDK_CONFIG_VTUNE
2749 	uint64_t now_tsc = spdk_get_ticks();
2750 	if (now_tsc > (bdev_io->internal.ch->start_tsc + bdev_io->internal.ch->interval_tsc)) {
2751 		uint64_t data[5];
2752 
2753 		data[0] = bdev_io->internal.ch->stat.num_read_ops - bdev_io->internal.ch->prev_stat.num_read_ops;
2754 		data[1] = bdev_io->internal.ch->stat.bytes_read - bdev_io->internal.ch->prev_stat.bytes_read;
2755 		data[2] = bdev_io->internal.ch->stat.num_write_ops - bdev_io->internal.ch->prev_stat.num_write_ops;
2756 		data[3] = bdev_io->internal.ch->stat.bytes_written - bdev_io->internal.ch->prev_stat.bytes_written;
2757 		data[4] = bdev_io->bdev->fn_table->get_spin_time ?
2758 			  bdev_io->bdev->fn_table->get_spin_time(bdev_io->internal.ch->channel) : 0;
2759 
2760 		__itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->internal.ch->handle,
2761 				   __itt_metadata_u64, 5, data);
2762 
2763 		bdev_io->internal.ch->prev_stat = bdev_io->internal.ch->stat;
2764 		bdev_io->internal.ch->start_tsc = now_tsc;
2765 	}
2766 #endif
2767 
2768 	assert(bdev_io->internal.cb != NULL);
2769 	assert(spdk_get_thread() == spdk_io_channel_get_thread(bdev_io->internal.ch->channel));
2770 
2771 	bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS,
2772 			     bdev_io->internal.caller_ctx);
2773 }
2774 
2775 static void
2776 _spdk_bdev_reset_complete(struct spdk_io_channel_iter *i, int status)
2777 {
2778 	struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i);
2779 
2780 	if (bdev_io->u.reset.ch_ref != NULL) {
2781 		spdk_put_io_channel(bdev_io->u.reset.ch_ref);
2782 		bdev_io->u.reset.ch_ref = NULL;
2783 	}
2784 
2785 	_spdk_bdev_io_complete(bdev_io);
2786 }
2787 
2788 static void
2789 _spdk_bdev_unfreeze_channel(struct spdk_io_channel_iter *i)
2790 {
2791 	struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i);
2792 	struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
2793 
2794 	ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS;
2795 	if (!TAILQ_EMPTY(&ch->queued_resets)) {
2796 		_spdk_bdev_channel_start_reset(ch);
2797 	}
2798 
2799 	spdk_for_each_channel_continue(i, 0);
2800 }
2801 
2802 void
2803 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
2804 {
2805 	struct spdk_bdev *bdev = bdev_io->bdev;
2806 	struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch;
2807 	struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource;
2808 
2809 	bdev_io->internal.status = status;
2810 
2811 	if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) {
2812 		bool unlock_channels = false;
2813 
2814 		if (status == SPDK_BDEV_IO_STATUS_NOMEM) {
2815 			SPDK_ERRLOG("NOMEM returned for reset\n");
2816 		}
2817 		pthread_mutex_lock(&bdev->internal.mutex);
2818 		if (bdev_io == bdev->internal.reset_in_progress) {
2819 			bdev->internal.reset_in_progress = NULL;
2820 			unlock_channels = true;
2821 		}
2822 		pthread_mutex_unlock(&bdev->internal.mutex);
2823 
2824 		if (unlock_channels) {
2825 			spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_unfreeze_channel,
2826 					      bdev_io, _spdk_bdev_reset_complete);
2827 			return;
2828 		}
2829 	} else {
2830 		assert(bdev_ch->io_outstanding > 0);
2831 		assert(shared_resource->io_outstanding > 0);
2832 		bdev_ch->io_outstanding--;
2833 		shared_resource->io_outstanding--;
2834 
2835 		if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) {
2836 			TAILQ_INSERT_HEAD(&shared_resource->nomem_io, bdev_io, internal.link);
2837 			/*
2838 			 * Wait for some of the outstanding I/O to complete before we
2839 			 *  retry any of the nomem_io.  Normally we will wait for
2840 			 *  NOMEM_THRESHOLD_COUNT I/O to complete but for low queue
2841 			 *  depth channels we will instead wait for half to complete.
2842 			 */
2843 			shared_resource->nomem_threshold = spdk_max((int64_t)shared_resource->io_outstanding / 2,
2844 							   (int64_t)shared_resource->io_outstanding - NOMEM_THRESHOLD_COUNT);
2845 			return;
2846 		}
2847 
2848 		if (spdk_unlikely(!TAILQ_EMPTY(&shared_resource->nomem_io))) {
2849 			_spdk_bdev_ch_retry_io(bdev_ch);
2850 		}
2851 	}
2852 
2853 	_spdk_bdev_io_complete(bdev_io);
2854 }
2855 
2856 void
2857 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc,
2858 				  enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq)
2859 {
2860 	if (sc == SPDK_SCSI_STATUS_GOOD) {
2861 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS;
2862 	} else {
2863 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SCSI_ERROR;
2864 		bdev_io->internal.error.scsi.sc = sc;
2865 		bdev_io->internal.error.scsi.sk = sk;
2866 		bdev_io->internal.error.scsi.asc = asc;
2867 		bdev_io->internal.error.scsi.ascq = ascq;
2868 	}
2869 
2870 	spdk_bdev_io_complete(bdev_io, bdev_io->internal.status);
2871 }
2872 
2873 void
2874 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io,
2875 			     int *sc, int *sk, int *asc, int *ascq)
2876 {
2877 	assert(sc != NULL);
2878 	assert(sk != NULL);
2879 	assert(asc != NULL);
2880 	assert(ascq != NULL);
2881 
2882 	switch (bdev_io->internal.status) {
2883 	case SPDK_BDEV_IO_STATUS_SUCCESS:
2884 		*sc = SPDK_SCSI_STATUS_GOOD;
2885 		*sk = SPDK_SCSI_SENSE_NO_SENSE;
2886 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
2887 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
2888 		break;
2889 	case SPDK_BDEV_IO_STATUS_NVME_ERROR:
2890 		spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq);
2891 		break;
2892 	case SPDK_BDEV_IO_STATUS_SCSI_ERROR:
2893 		*sc = bdev_io->internal.error.scsi.sc;
2894 		*sk = bdev_io->internal.error.scsi.sk;
2895 		*asc = bdev_io->internal.error.scsi.asc;
2896 		*ascq = bdev_io->internal.error.scsi.ascq;
2897 		break;
2898 	default:
2899 		*sc = SPDK_SCSI_STATUS_CHECK_CONDITION;
2900 		*sk = SPDK_SCSI_SENSE_ABORTED_COMMAND;
2901 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
2902 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
2903 		break;
2904 	}
2905 }
2906 
2907 void
2908 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc)
2909 {
2910 	if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) {
2911 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS;
2912 	} else {
2913 		bdev_io->internal.error.nvme.sct = sct;
2914 		bdev_io->internal.error.nvme.sc = sc;
2915 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NVME_ERROR;
2916 	}
2917 
2918 	spdk_bdev_io_complete(bdev_io, bdev_io->internal.status);
2919 }
2920 
2921 void
2922 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc)
2923 {
2924 	assert(sct != NULL);
2925 	assert(sc != NULL);
2926 
2927 	if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) {
2928 		*sct = bdev_io->internal.error.nvme.sct;
2929 		*sc = bdev_io->internal.error.nvme.sc;
2930 	} else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) {
2931 		*sct = SPDK_NVME_SCT_GENERIC;
2932 		*sc = SPDK_NVME_SC_SUCCESS;
2933 	} else {
2934 		*sct = SPDK_NVME_SCT_GENERIC;
2935 		*sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
2936 	}
2937 }
2938 
2939 struct spdk_thread *
2940 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io)
2941 {
2942 	return spdk_io_channel_get_thread(bdev_io->internal.ch->channel);
2943 }
2944 
2945 static void
2946 _spdk_bdev_qos_config_type(struct spdk_bdev *bdev, uint64_t qos_set,
2947 			   enum spdk_bdev_qos_type qos_type)
2948 {
2949 	uint64_t	min_qos_set = 0;
2950 
2951 	switch (qos_type) {
2952 	case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT:
2953 		min_qos_set = SPDK_BDEV_QOS_MIN_IOS_PER_SEC;
2954 		break;
2955 	case SPDK_BDEV_QOS_RW_BYTEPS_RATE_LIMIT:
2956 		min_qos_set = SPDK_BDEV_QOS_MIN_BW_IN_MB_PER_SEC;
2957 		break;
2958 	default:
2959 		SPDK_ERRLOG("Unsupported QoS type.\n");
2960 		return;
2961 	}
2962 
2963 	if (qos_set % min_qos_set) {
2964 		SPDK_ERRLOG("Assigned QoS %" PRIu64 " on bdev %s is not multiple of %lu\n",
2965 			    qos_set, bdev->name, min_qos_set);
2966 		SPDK_ERRLOG("Failed to enable QoS on this bdev %s\n", bdev->name);
2967 		return;
2968 	}
2969 
2970 	if (!bdev->internal.qos) {
2971 		bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos));
2972 		if (!bdev->internal.qos) {
2973 			SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n");
2974 			return;
2975 		}
2976 	}
2977 
2978 	switch (qos_type) {
2979 	case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT:
2980 		bdev->internal.qos->iops_rate_limit = qos_set;
2981 		break;
2982 	case SPDK_BDEV_QOS_RW_BYTEPS_RATE_LIMIT:
2983 		bdev->internal.qos->byte_rate_limit = qos_set * 1024 * 1024;
2984 		break;
2985 	default:
2986 		break;
2987 	}
2988 
2989 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev:%s QoS type:%d set:%lu\n",
2990 		      bdev->name, qos_type, qos_set);
2991 
2992 	return;
2993 }
2994 
2995 static void
2996 _spdk_bdev_qos_config(struct spdk_bdev *bdev)
2997 {
2998 	struct spdk_conf_section	*sp = NULL;
2999 	const char			*val = NULL;
3000 	uint64_t			qos_set = 0;
3001 	int				i = 0, j = 0;
3002 
3003 	sp = spdk_conf_find_section(NULL, "QoS");
3004 	if (!sp) {
3005 		return;
3006 	}
3007 
3008 	while (j < SPDK_BDEV_QOS_NUM_TYPES) {
3009 		i = 0;
3010 		while (true) {
3011 			val = spdk_conf_section_get_nmval(sp, qos_type_str[j], i, 0);
3012 			if (!val) {
3013 				break;
3014 			}
3015 
3016 			if (strcmp(bdev->name, val) != 0) {
3017 				i++;
3018 				continue;
3019 			}
3020 
3021 			val = spdk_conf_section_get_nmval(sp, qos_type_str[j], i, 1);
3022 			if (val) {
3023 				qos_set = strtoull(val, NULL, 10);
3024 				_spdk_bdev_qos_config_type(bdev, qos_set, j);
3025 			}
3026 
3027 			break;
3028 		}
3029 
3030 		j++;
3031 	}
3032 
3033 	return;
3034 }
3035 
3036 static int
3037 spdk_bdev_init(struct spdk_bdev *bdev)
3038 {
3039 	assert(bdev->module != NULL);
3040 
3041 	if (!bdev->name) {
3042 		SPDK_ERRLOG("Bdev name is NULL\n");
3043 		return -EINVAL;
3044 	}
3045 
3046 	if (spdk_bdev_get_by_name(bdev->name)) {
3047 		SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name);
3048 		return -EEXIST;
3049 	}
3050 
3051 	bdev->internal.status = SPDK_BDEV_STATUS_READY;
3052 	bdev->internal.measured_queue_depth = UINT64_MAX;
3053 
3054 	TAILQ_INIT(&bdev->internal.open_descs);
3055 
3056 	TAILQ_INIT(&bdev->aliases);
3057 
3058 	bdev->internal.reset_in_progress = NULL;
3059 
3060 	_spdk_bdev_qos_config(bdev);
3061 
3062 	spdk_io_device_register(__bdev_to_io_dev(bdev),
3063 				spdk_bdev_channel_create, spdk_bdev_channel_destroy,
3064 				sizeof(struct spdk_bdev_channel));
3065 
3066 	pthread_mutex_init(&bdev->internal.mutex, NULL);
3067 	return 0;
3068 }
3069 
3070 static void
3071 spdk_bdev_destroy_cb(void *io_device)
3072 {
3073 	int			rc;
3074 	struct spdk_bdev	*bdev;
3075 	spdk_bdev_unregister_cb	cb_fn;
3076 	void			*cb_arg;
3077 
3078 	bdev = __bdev_from_io_dev(io_device);
3079 	cb_fn = bdev->internal.unregister_cb;
3080 	cb_arg = bdev->internal.unregister_ctx;
3081 
3082 	rc = bdev->fn_table->destruct(bdev->ctxt);
3083 	if (rc < 0) {
3084 		SPDK_ERRLOG("destruct failed\n");
3085 	}
3086 	if (rc <= 0 && cb_fn != NULL) {
3087 		cb_fn(cb_arg, rc);
3088 	}
3089 }
3090 
3091 
3092 static void
3093 spdk_bdev_fini(struct spdk_bdev *bdev)
3094 {
3095 	pthread_mutex_destroy(&bdev->internal.mutex);
3096 
3097 	free(bdev->internal.qos);
3098 
3099 	spdk_io_device_unregister(__bdev_to_io_dev(bdev), spdk_bdev_destroy_cb);
3100 }
3101 
3102 static void
3103 spdk_bdev_start(struct spdk_bdev *bdev)
3104 {
3105 	struct spdk_bdev_module *module;
3106 	uint32_t action;
3107 
3108 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name);
3109 	TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, internal.link);
3110 
3111 	/* Examine configuration before initializing I/O */
3112 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) {
3113 		if (module->examine_config) {
3114 			action = module->internal.action_in_progress;
3115 			module->internal.action_in_progress++;
3116 			module->examine_config(bdev);
3117 			if (action != module->internal.action_in_progress) {
3118 				SPDK_ERRLOG("examine_config for module %s did not call spdk_bdev_module_examine_done()\n",
3119 					    module->name);
3120 			}
3121 		}
3122 	}
3123 
3124 	if (bdev->internal.claim_module) {
3125 		return;
3126 	}
3127 
3128 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) {
3129 		if (module->examine_disk) {
3130 			module->internal.action_in_progress++;
3131 			module->examine_disk(bdev);
3132 		}
3133 	}
3134 }
3135 
3136 int
3137 spdk_bdev_register(struct spdk_bdev *bdev)
3138 {
3139 	int rc = spdk_bdev_init(bdev);
3140 
3141 	if (rc == 0) {
3142 		spdk_bdev_start(bdev);
3143 	}
3144 
3145 	return rc;
3146 }
3147 
3148 int
3149 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count)
3150 {
3151 	int rc;
3152 
3153 	rc = spdk_bdev_init(vbdev);
3154 	if (rc) {
3155 		return rc;
3156 	}
3157 
3158 	spdk_bdev_start(vbdev);
3159 	return 0;
3160 }
3161 
3162 void
3163 spdk_bdev_destruct_done(struct spdk_bdev *bdev, int bdeverrno)
3164 {
3165 	if (bdev->internal.unregister_cb != NULL) {
3166 		bdev->internal.unregister_cb(bdev->internal.unregister_ctx, bdeverrno);
3167 	}
3168 }
3169 
3170 static void
3171 _remove_notify(void *arg)
3172 {
3173 	struct spdk_bdev_desc *desc = arg;
3174 
3175 	desc->remove_cb(desc->remove_ctx);
3176 }
3177 
3178 void
3179 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg)
3180 {
3181 	struct spdk_bdev_desc	*desc, *tmp;
3182 	bool			do_destruct = true;
3183 	struct spdk_thread	*thread;
3184 
3185 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name);
3186 
3187 	thread = spdk_get_thread();
3188 	if (!thread) {
3189 		/* The user called this from a non-SPDK thread. */
3190 		if (cb_fn != NULL) {
3191 			cb_fn(cb_arg, -ENOTSUP);
3192 		}
3193 		return;
3194 	}
3195 
3196 	pthread_mutex_lock(&bdev->internal.mutex);
3197 
3198 	bdev->internal.status = SPDK_BDEV_STATUS_REMOVING;
3199 	bdev->internal.unregister_cb = cb_fn;
3200 	bdev->internal.unregister_ctx = cb_arg;
3201 
3202 	TAILQ_FOREACH_SAFE(desc, &bdev->internal.open_descs, link, tmp) {
3203 		if (desc->remove_cb) {
3204 			do_destruct = false;
3205 			/*
3206 			 * Defer invocation of the remove_cb to a separate message that will
3207 			 *  run later on this thread.  This ensures this context unwinds and
3208 			 *  we don't recursively unregister this bdev again if the remove_cb
3209 			 *  immediately closes its descriptor.
3210 			 */
3211 			if (!desc->remove_scheduled) {
3212 				/* Avoid scheduling removal of the same descriptor multiple times. */
3213 				desc->remove_scheduled = true;
3214 				spdk_thread_send_msg(thread, _remove_notify, desc);
3215 			}
3216 		}
3217 	}
3218 
3219 	if (!do_destruct) {
3220 		pthread_mutex_unlock(&bdev->internal.mutex);
3221 		return;
3222 	}
3223 
3224 	TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link);
3225 	pthread_mutex_unlock(&bdev->internal.mutex);
3226 
3227 	spdk_bdev_fini(bdev);
3228 }
3229 
3230 int
3231 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb,
3232 	       void *remove_ctx, struct spdk_bdev_desc **_desc)
3233 {
3234 	struct spdk_bdev_desc *desc;
3235 
3236 	desc = calloc(1, sizeof(*desc));
3237 	if (desc == NULL) {
3238 		SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n");
3239 		return -ENOMEM;
3240 	}
3241 
3242 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Opening descriptor %p for bdev %s on thread %p\n", desc, bdev->name,
3243 		      spdk_get_thread());
3244 
3245 	pthread_mutex_lock(&bdev->internal.mutex);
3246 
3247 	if (write && bdev->internal.claim_module) {
3248 		SPDK_ERRLOG("Could not open %s - %s module already claimed it\n",
3249 			    bdev->name, bdev->internal.claim_module->name);
3250 		free(desc);
3251 		pthread_mutex_unlock(&bdev->internal.mutex);
3252 		return -EPERM;
3253 	}
3254 
3255 	TAILQ_INSERT_TAIL(&bdev->internal.open_descs, desc, link);
3256 
3257 	desc->bdev = bdev;
3258 	desc->remove_cb = remove_cb;
3259 	desc->remove_ctx = remove_ctx;
3260 	desc->write = write;
3261 	*_desc = desc;
3262 
3263 	pthread_mutex_unlock(&bdev->internal.mutex);
3264 
3265 	return 0;
3266 }
3267 
3268 void
3269 spdk_bdev_close(struct spdk_bdev_desc *desc)
3270 {
3271 	struct spdk_bdev *bdev = desc->bdev;
3272 	bool do_unregister = false;
3273 
3274 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closing descriptor %p for bdev %s on thread %p\n", desc, bdev->name,
3275 		      spdk_get_thread());
3276 
3277 	pthread_mutex_lock(&bdev->internal.mutex);
3278 
3279 	TAILQ_REMOVE(&bdev->internal.open_descs, desc, link);
3280 	free(desc);
3281 
3282 	/* If no more descriptors, kill QoS channel */
3283 	if (bdev->internal.qos && TAILQ_EMPTY(&bdev->internal.open_descs)) {
3284 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closed last descriptor for bdev %s on thread %p. Stopping QoS.\n",
3285 			      bdev->name, spdk_get_thread());
3286 
3287 		if (spdk_bdev_qos_destroy(bdev)) {
3288 			/* There isn't anything we can do to recover here. Just let the
3289 			 * old QoS poller keep running. The QoS handling won't change
3290 			 * cores when the user allocates a new channel, but it won't break. */
3291 			SPDK_ERRLOG("Unable to shut down QoS poller. It will continue running on the current thread.\n");
3292 		}
3293 	}
3294 
3295 	spdk_bdev_set_qd_sampling_period(bdev, 0);
3296 
3297 	if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->internal.open_descs)) {
3298 		do_unregister = true;
3299 	}
3300 	pthread_mutex_unlock(&bdev->internal.mutex);
3301 
3302 	if (do_unregister == true) {
3303 		spdk_bdev_unregister(bdev, bdev->internal.unregister_cb, bdev->internal.unregister_ctx);
3304 	}
3305 }
3306 
3307 int
3308 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc,
3309 			    struct spdk_bdev_module *module)
3310 {
3311 	if (bdev->internal.claim_module != NULL) {
3312 		SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name,
3313 			    bdev->internal.claim_module->name);
3314 		return -EPERM;
3315 	}
3316 
3317 	if (desc && !desc->write) {
3318 		desc->write = true;
3319 	}
3320 
3321 	bdev->internal.claim_module = module;
3322 	return 0;
3323 }
3324 
3325 void
3326 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev)
3327 {
3328 	assert(bdev->internal.claim_module != NULL);
3329 	bdev->internal.claim_module = NULL;
3330 }
3331 
3332 struct spdk_bdev *
3333 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc)
3334 {
3335 	return desc->bdev;
3336 }
3337 
3338 void
3339 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp)
3340 {
3341 	struct iovec *iovs;
3342 	int iovcnt;
3343 
3344 	if (bdev_io == NULL) {
3345 		return;
3346 	}
3347 
3348 	switch (bdev_io->type) {
3349 	case SPDK_BDEV_IO_TYPE_READ:
3350 		iovs = bdev_io->u.bdev.iovs;
3351 		iovcnt = bdev_io->u.bdev.iovcnt;
3352 		break;
3353 	case SPDK_BDEV_IO_TYPE_WRITE:
3354 		iovs = bdev_io->u.bdev.iovs;
3355 		iovcnt = bdev_io->u.bdev.iovcnt;
3356 		break;
3357 	default:
3358 		iovs = NULL;
3359 		iovcnt = 0;
3360 		break;
3361 	}
3362 
3363 	if (iovp) {
3364 		*iovp = iovs;
3365 	}
3366 	if (iovcntp) {
3367 		*iovcntp = iovcnt;
3368 	}
3369 }
3370 
3371 void
3372 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module)
3373 {
3374 
3375 	if (spdk_bdev_module_list_find(bdev_module->name)) {
3376 		SPDK_ERRLOG("ERROR: module '%s' already registered.\n", bdev_module->name);
3377 		assert(false);
3378 	}
3379 
3380 	if (bdev_module->async_init) {
3381 		bdev_module->internal.action_in_progress = 1;
3382 	}
3383 
3384 	/*
3385 	 * Modules with examine callbacks must be initialized first, so they are
3386 	 *  ready to handle examine callbacks from later modules that will
3387 	 *  register physical bdevs.
3388 	 */
3389 	if (bdev_module->examine_config != NULL || bdev_module->examine_disk != NULL) {
3390 		TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq);
3391 	} else {
3392 		TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq);
3393 	}
3394 }
3395 
3396 struct spdk_bdev_module *
3397 spdk_bdev_module_list_find(const char *name)
3398 {
3399 	struct spdk_bdev_module *bdev_module;
3400 
3401 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) {
3402 		if (strcmp(name, bdev_module->name) == 0) {
3403 			break;
3404 		}
3405 	}
3406 
3407 	return bdev_module;
3408 }
3409 
3410 static void
3411 _spdk_bdev_write_zero_buffer_next(void *_bdev_io)
3412 {
3413 	struct spdk_bdev_io *bdev_io = _bdev_io;
3414 	uint64_t num_bytes, num_blocks;
3415 	int rc;
3416 
3417 	num_bytes = spdk_min(spdk_bdev_get_block_size(bdev_io->bdev) *
3418 			     bdev_io->u.bdev.split_remaining_num_blocks,
3419 			     ZERO_BUFFER_SIZE);
3420 	num_blocks = num_bytes / spdk_bdev_get_block_size(bdev_io->bdev);
3421 
3422 	rc = spdk_bdev_write_blocks(bdev_io->internal.desc,
3423 				    spdk_io_channel_from_ctx(bdev_io->internal.ch),
3424 				    g_bdev_mgr.zero_buffer,
3425 				    bdev_io->u.bdev.split_current_offset_blocks, num_blocks,
3426 				    _spdk_bdev_write_zero_buffer_done, bdev_io);
3427 	if (rc == 0) {
3428 		bdev_io->u.bdev.split_remaining_num_blocks -= num_blocks;
3429 		bdev_io->u.bdev.split_current_offset_blocks += num_blocks;
3430 	} else if (rc == -ENOMEM) {
3431 		bdev_io->internal.waitq_entry.bdev = bdev_io->bdev;
3432 		bdev_io->internal.waitq_entry.cb_fn = _spdk_bdev_write_zero_buffer_next;
3433 		bdev_io->internal.waitq_entry.cb_arg = bdev_io;
3434 		spdk_bdev_queue_io_wait(bdev_io->bdev, spdk_io_channel_from_ctx(bdev_io->internal.ch),
3435 					&bdev_io->internal.waitq_entry);
3436 	} else {
3437 		/* This should never happen. */
3438 		assert(false);
3439 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED;
3440 		bdev_io->internal.cb(bdev_io, SPDK_BDEV_IO_STATUS_FAILED, bdev_io->internal.caller_ctx);
3441 	}
3442 }
3443 
3444 static void
3445 _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
3446 {
3447 	struct spdk_bdev_io *parent_io = cb_arg;
3448 
3449 	if (!success) {
3450 		parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED;
3451 		parent_io->internal.cb(parent_io, SPDK_BDEV_IO_STATUS_FAILED, parent_io->internal.caller_ctx);
3452 		return;
3453 	}
3454 
3455 	if (parent_io->u.bdev.split_remaining_num_blocks == 0) {
3456 		parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS;
3457 		parent_io->internal.cb(parent_io, SPDK_BDEV_IO_STATUS_SUCCESS, parent_io->internal.caller_ctx);
3458 		return;
3459 	}
3460 
3461 	_spdk_bdev_write_zero_buffer_next(parent_io);
3462 }
3463 
3464 struct set_qos_limit_ctx {
3465 	void (*cb_fn)(void *cb_arg, int status);
3466 	void *cb_arg;
3467 	struct spdk_bdev *bdev;
3468 };
3469 
3470 static void
3471 _spdk_bdev_set_qos_limit_done(struct set_qos_limit_ctx *ctx, int status)
3472 {
3473 	pthread_mutex_lock(&ctx->bdev->internal.mutex);
3474 	ctx->bdev->internal.qos_mod_in_progress = false;
3475 	pthread_mutex_unlock(&ctx->bdev->internal.mutex);
3476 
3477 	ctx->cb_fn(ctx->cb_arg, status);
3478 	free(ctx);
3479 }
3480 
3481 static void
3482 _spdk_bdev_disable_qos_done(void *cb_arg)
3483 {
3484 	struct set_qos_limit_ctx *ctx = cb_arg;
3485 	struct spdk_bdev *bdev = ctx->bdev;
3486 	struct spdk_bdev_io *bdev_io;
3487 	struct spdk_bdev_qos *qos;
3488 
3489 	pthread_mutex_lock(&bdev->internal.mutex);
3490 	qos = bdev->internal.qos;
3491 	bdev->internal.qos = NULL;
3492 	pthread_mutex_unlock(&bdev->internal.mutex);
3493 
3494 	while (!TAILQ_EMPTY(&qos->queued)) {
3495 		/* Send queued I/O back to their original thread for resubmission. */
3496 		bdev_io = TAILQ_FIRST(&qos->queued);
3497 		TAILQ_REMOVE(&qos->queued, bdev_io, internal.link);
3498 
3499 		if (bdev_io->internal.io_submit_ch) {
3500 			/*
3501 			 * Channel was changed when sending it to the QoS thread - change it back
3502 			 *  before sending it back to the original thread.
3503 			 */
3504 			bdev_io->internal.ch = bdev_io->internal.io_submit_ch;
3505 			bdev_io->internal.io_submit_ch = NULL;
3506 		}
3507 
3508 		spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel),
3509 				     _spdk_bdev_io_submit, bdev_io);
3510 	}
3511 
3512 	spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch));
3513 	spdk_poller_unregister(&qos->poller);
3514 
3515 	free(qos);
3516 
3517 	_spdk_bdev_set_qos_limit_done(ctx, 0);
3518 }
3519 
3520 static void
3521 _spdk_bdev_disable_qos_msg_done(struct spdk_io_channel_iter *i, int status)
3522 {
3523 	void *io_device = spdk_io_channel_iter_get_io_device(i);
3524 	struct spdk_bdev *bdev = __bdev_from_io_dev(io_device);
3525 	struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
3526 	struct spdk_thread *thread;
3527 
3528 	pthread_mutex_lock(&bdev->internal.mutex);
3529 	thread = bdev->internal.qos->thread;
3530 	pthread_mutex_unlock(&bdev->internal.mutex);
3531 
3532 	spdk_thread_send_msg(thread, _spdk_bdev_disable_qos_done, ctx);
3533 }
3534 
3535 static void
3536 _spdk_bdev_disable_qos_msg(struct spdk_io_channel_iter *i)
3537 {
3538 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
3539 	struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch);
3540 
3541 	bdev_ch->flags &= ~BDEV_CH_QOS_ENABLED;
3542 
3543 	spdk_for_each_channel_continue(i, 0);
3544 }
3545 
3546 static void
3547 _spdk_bdev_update_qos_limit_iops_msg(void *cb_arg)
3548 {
3549 	struct set_qos_limit_ctx *ctx = cb_arg;
3550 	struct spdk_bdev *bdev = ctx->bdev;
3551 
3552 	pthread_mutex_lock(&bdev->internal.mutex);
3553 	spdk_bdev_qos_update_max_quota_per_timeslice(bdev->internal.qos);
3554 	pthread_mutex_unlock(&bdev->internal.mutex);
3555 
3556 	_spdk_bdev_set_qos_limit_done(ctx, 0);
3557 }
3558 
3559 static void
3560 _spdk_bdev_enable_qos_msg(struct spdk_io_channel_iter *i)
3561 {
3562 	void *io_device = spdk_io_channel_iter_get_io_device(i);
3563 	struct spdk_bdev *bdev = __bdev_from_io_dev(io_device);
3564 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
3565 	struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch);
3566 
3567 	pthread_mutex_lock(&bdev->internal.mutex);
3568 	_spdk_bdev_enable_qos(bdev, bdev_ch);
3569 	pthread_mutex_unlock(&bdev->internal.mutex);
3570 	spdk_for_each_channel_continue(i, 0);
3571 }
3572 
3573 static void
3574 _spdk_bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status)
3575 {
3576 	struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
3577 
3578 	_spdk_bdev_set_qos_limit_done(ctx, status);
3579 }
3580 
3581 void
3582 spdk_bdev_set_qos_limit_iops(struct spdk_bdev *bdev, uint64_t ios_per_sec,
3583 			     void (*cb_fn)(void *cb_arg, int status), void *cb_arg)
3584 {
3585 	struct set_qos_limit_ctx *ctx;
3586 
3587 	if (ios_per_sec > 0 && ios_per_sec % SPDK_BDEV_QOS_MIN_IOS_PER_SEC) {
3588 		SPDK_ERRLOG("Requested ios_per_sec limit %" PRIu64 " is not a multiple of %u\n",
3589 			    ios_per_sec, SPDK_BDEV_QOS_MIN_IOS_PER_SEC);
3590 		cb_fn(cb_arg, -EINVAL);
3591 		return;
3592 	}
3593 
3594 	ctx = calloc(1, sizeof(*ctx));
3595 	if (ctx == NULL) {
3596 		cb_fn(cb_arg, -ENOMEM);
3597 		return;
3598 	}
3599 
3600 	ctx->cb_fn = cb_fn;
3601 	ctx->cb_arg = cb_arg;
3602 	ctx->bdev = bdev;
3603 
3604 	pthread_mutex_lock(&bdev->internal.mutex);
3605 	if (bdev->internal.qos_mod_in_progress) {
3606 		pthread_mutex_unlock(&bdev->internal.mutex);
3607 		free(ctx);
3608 		cb_fn(cb_arg, -EAGAIN);
3609 		return;
3610 	}
3611 	bdev->internal.qos_mod_in_progress = true;
3612 
3613 	if (ios_per_sec > 0) {
3614 		if (bdev->internal.qos == NULL) {
3615 			/* Enabling */
3616 			bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos));
3617 			if (!bdev->internal.qos) {
3618 				pthread_mutex_unlock(&bdev->internal.mutex);
3619 				SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n");
3620 				free(ctx);
3621 				cb_fn(cb_arg, -ENOMEM);
3622 				return;
3623 			}
3624 
3625 			bdev->internal.qos->iops_rate_limit = ios_per_sec;
3626 			spdk_for_each_channel(__bdev_to_io_dev(bdev),
3627 					      _spdk_bdev_enable_qos_msg, ctx,
3628 					      _spdk_bdev_enable_qos_done);
3629 		} else {
3630 			/* Updating */
3631 			bdev->internal.qos->iops_rate_limit = ios_per_sec;
3632 			spdk_thread_send_msg(bdev->internal.qos->thread, _spdk_bdev_update_qos_limit_iops_msg, ctx);
3633 		}
3634 	} else {
3635 		if (bdev->internal.qos != NULL) {
3636 			/* Disabling */
3637 			spdk_for_each_channel(__bdev_to_io_dev(bdev),
3638 					      _spdk_bdev_disable_qos_msg, ctx,
3639 					      _spdk_bdev_disable_qos_msg_done);
3640 		} else {
3641 			pthread_mutex_unlock(&bdev->internal.mutex);
3642 			_spdk_bdev_set_qos_limit_done(ctx, 0);
3643 			return;
3644 		}
3645 	}
3646 
3647 	pthread_mutex_unlock(&bdev->internal.mutex);
3648 }
3649 
3650 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV)
3651 
3652 SPDK_TRACE_REGISTER_FN(bdev_trace)
3653 {
3654 	spdk_trace_register_owner(OWNER_BDEV, 'b');
3655 	spdk_trace_register_object(OBJECT_BDEV_IO, 'i');
3656 	spdk_trace_register_description("BDEV_IO_START", "", TRACE_BDEV_IO_START, OWNER_BDEV,
3657 					OBJECT_BDEV_IO, 1, 0, 0, "type:   ");
3658 	spdk_trace_register_description("BDEV_IO_DONE", "", TRACE_BDEV_IO_DONE, OWNER_BDEV,
3659 					OBJECT_BDEV_IO, 0, 0, 0, "");
3660 }
3661