xref: /spdk/lib/bdev/bdev.c (revision bcfd6d0fb47bc29d04d5948fa6f1a8bf8e7aa220)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (C) 2008-2012 Daisuke Aoyama <aoyama@peach.ne.jp>.
5  *   Copyright (c) Intel Corporation.
6  *   All rights reserved.
7  *
8  *   Redistribution and use in source and binary forms, with or without
9  *   modification, are permitted provided that the following conditions
10  *   are met:
11  *
12  *     * Redistributions of source code must retain the above copyright
13  *       notice, this list of conditions and the following disclaimer.
14  *     * Redistributions in binary form must reproduce the above copyright
15  *       notice, this list of conditions and the following disclaimer in
16  *       the documentation and/or other materials provided with the
17  *       distribution.
18  *     * Neither the name of Intel Corporation nor the names of its
19  *       contributors may be used to endorse or promote products derived
20  *       from this software without specific prior written permission.
21  *
22  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "spdk/stdinc.h"
36 
37 #include "spdk/bdev.h"
38 #include "spdk/conf.h"
39 
40 #include "spdk/env.h"
41 #include "spdk/event.h"
42 #include "spdk/thread.h"
43 #include "spdk/likely.h"
44 #include "spdk/queue.h"
45 #include "spdk/nvme_spec.h"
46 #include "spdk/scsi_spec.h"
47 #include "spdk/util.h"
48 
49 #include "spdk/bdev_module.h"
50 #include "spdk_internal/log.h"
51 #include "spdk/string.h"
52 
53 #ifdef SPDK_CONFIG_VTUNE
54 #include "ittnotify.h"
55 #include "ittnotify_types.h"
56 int __itt_init_ittlib(const char *, __itt_group_id);
57 #endif
58 
59 #define SPDK_BDEV_IO_POOL_SIZE			(64 * 1024)
60 #define SPDK_BDEV_IO_CACHE_SIZE			256
61 #define BUF_SMALL_POOL_SIZE			8192
62 #define BUF_LARGE_POOL_SIZE			1024
63 #define NOMEM_THRESHOLD_COUNT			8
64 #define ZERO_BUFFER_SIZE			0x100000
65 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC		1000
66 #define SPDK_BDEV_SEC_TO_USEC			1000000ULL
67 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE	1
68 #define SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE	512
69 #define SPDK_BDEV_QOS_MIN_IOS_PER_SEC		10000
70 #define SPDK_BDEV_QOS_MIN_BW_IN_MB_PER_SEC	10
71 
72 enum spdk_bdev_qos_type {
73 	SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT = 0,
74 	SPDK_BDEV_QOS_RW_BYTEPS_RATE_LIMIT,
75 	SPDK_BDEV_QOS_NUM_TYPES /* Keep last */
76 };
77 
78 static const char *qos_type_str[SPDK_BDEV_QOS_NUM_TYPES] = {"Limit_IOPS", "Limit_BWPS"};
79 
80 struct spdk_bdev_mgr {
81 	struct spdk_mempool *bdev_io_pool;
82 
83 	struct spdk_mempool *buf_small_pool;
84 	struct spdk_mempool *buf_large_pool;
85 
86 	void *zero_buffer;
87 
88 	TAILQ_HEAD(, spdk_bdev_module) bdev_modules;
89 
90 	TAILQ_HEAD(, spdk_bdev) bdevs;
91 
92 	bool init_complete;
93 	bool module_init_complete;
94 
95 #ifdef SPDK_CONFIG_VTUNE
96 	__itt_domain	*domain;
97 #endif
98 };
99 
100 static struct spdk_bdev_mgr g_bdev_mgr = {
101 	.bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules),
102 	.bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs),
103 	.init_complete = false,
104 	.module_init_complete = false,
105 };
106 
107 static struct spdk_bdev_opts	g_bdev_opts = {
108 	.bdev_io_pool_size = SPDK_BDEV_IO_POOL_SIZE,
109 	.bdev_io_cache_size = SPDK_BDEV_IO_CACHE_SIZE,
110 };
111 
112 static spdk_bdev_init_cb	g_init_cb_fn = NULL;
113 static void			*g_init_cb_arg = NULL;
114 
115 static spdk_bdev_fini_cb	g_fini_cb_fn = NULL;
116 static void			*g_fini_cb_arg = NULL;
117 static struct spdk_thread	*g_fini_thread = NULL;
118 
119 struct spdk_bdev_qos {
120 	/** Rate limit, in I/O per second */
121 	uint64_t iops_rate_limit;
122 
123 	/** Rate limit, in byte per second */
124 	uint64_t byte_rate_limit;
125 
126 	/** The channel that all I/O are funneled through */
127 	struct spdk_bdev_channel *ch;
128 
129 	/** The thread on which the poller is running. */
130 	struct spdk_thread *thread;
131 
132 	/** Queue of I/O waiting to be issued. */
133 	bdev_io_tailq_t queued;
134 
135 	/** Maximum allowed IOs to be issued in one timeslice (e.g., 1ms) and
136 	 *  only valid for the master channel which manages the outstanding IOs. */
137 	uint64_t max_ios_per_timeslice;
138 
139 	/** Maximum allowed bytes to be issued in one timeslice (e.g., 1ms) and
140 	 *  only valid for the master channel which manages the outstanding IOs. */
141 	uint64_t max_byte_per_timeslice;
142 
143 	/** Submitted IO in one timeslice (e.g., 1ms) */
144 	uint64_t io_submitted_this_timeslice;
145 
146 	/** Submitted byte in one timeslice (e.g., 1ms) */
147 	uint64_t byte_submitted_this_timeslice;
148 
149 	/** Polller that processes queued I/O commands each time slice. */
150 	struct spdk_poller *poller;
151 };
152 
153 struct spdk_bdev_mgmt_channel {
154 	bdev_io_stailq_t need_buf_small;
155 	bdev_io_stailq_t need_buf_large;
156 
157 	/*
158 	 * Each thread keeps a cache of bdev_io - this allows
159 	 *  bdev threads which are *not* DPDK threads to still
160 	 *  benefit from a per-thread bdev_io cache.  Without
161 	 *  this, non-DPDK threads fetching from the mempool
162 	 *  incur a cmpxchg on get and put.
163 	 */
164 	bdev_io_stailq_t per_thread_cache;
165 	uint32_t	per_thread_cache_count;
166 	uint32_t	bdev_io_cache_size;
167 
168 	TAILQ_HEAD(, spdk_bdev_shared_resource)	shared_resources;
169 	TAILQ_HEAD(, spdk_bdev_io_wait_entry)	io_wait_queue;
170 };
171 
172 /*
173  * Per-module (or per-io_device) data. Multiple bdevs built on the same io_device
174  * will queue here their IO that awaits retry. It makes it posible to retry sending
175  * IO to one bdev after IO from other bdev completes.
176  */
177 struct spdk_bdev_shared_resource {
178 	/* The bdev management channel */
179 	struct spdk_bdev_mgmt_channel *mgmt_ch;
180 
181 	/*
182 	 * Count of I/O submitted to bdev module and waiting for completion.
183 	 * Incremented before submit_request() is called on an spdk_bdev_io.
184 	 */
185 	uint64_t		io_outstanding;
186 
187 	/*
188 	 * Queue of IO awaiting retry because of a previous NOMEM status returned
189 	 *  on this channel.
190 	 */
191 	bdev_io_tailq_t		nomem_io;
192 
193 	/*
194 	 * Threshold which io_outstanding must drop to before retrying nomem_io.
195 	 */
196 	uint64_t		nomem_threshold;
197 
198 	/* I/O channel allocated by a bdev module */
199 	struct spdk_io_channel	*shared_ch;
200 
201 	/* Refcount of bdev channels using this resource */
202 	uint32_t		ref;
203 
204 	TAILQ_ENTRY(spdk_bdev_shared_resource) link;
205 };
206 
207 #define BDEV_CH_RESET_IN_PROGRESS	(1 << 0)
208 #define BDEV_CH_QOS_ENABLED		(1 << 1)
209 
210 struct spdk_bdev_channel {
211 	struct spdk_bdev	*bdev;
212 
213 	/* The channel for the underlying device */
214 	struct spdk_io_channel	*channel;
215 
216 	/* Per io_device per thread data */
217 	struct spdk_bdev_shared_resource *shared_resource;
218 
219 	struct spdk_bdev_io_stat stat;
220 
221 	/*
222 	 * Count of I/O submitted through this channel and waiting for completion.
223 	 * Incremented before submit_request() is called on an spdk_bdev_io.
224 	 */
225 	uint64_t		io_outstanding;
226 
227 	bdev_io_tailq_t		queued_resets;
228 
229 	uint32_t		flags;
230 
231 #ifdef SPDK_CONFIG_VTUNE
232 	uint64_t		start_tsc;
233 	uint64_t		interval_tsc;
234 	__itt_string_handle	*handle;
235 	struct spdk_bdev_io_stat prev_stat;
236 #endif
237 
238 };
239 
240 struct spdk_bdev_desc {
241 	struct spdk_bdev		*bdev;
242 	spdk_bdev_remove_cb_t		remove_cb;
243 	void				*remove_ctx;
244 	bool				remove_scheduled;
245 	bool				write;
246 	TAILQ_ENTRY(spdk_bdev_desc)	link;
247 };
248 
249 struct spdk_bdev_iostat_ctx {
250 	struct spdk_bdev_io_stat *stat;
251 	spdk_bdev_get_device_stat_cb cb;
252 	void *cb_arg;
253 };
254 
255 #define __bdev_to_io_dev(bdev)		(((char *)bdev) + 1)
256 #define __bdev_from_io_dev(io_dev)	((struct spdk_bdev *)(((char *)io_dev) - 1))
257 
258 static void spdk_bdev_write_zeroes_split(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg);
259 
260 void
261 spdk_bdev_get_opts(struct spdk_bdev_opts *opts)
262 {
263 	*opts = g_bdev_opts;
264 }
265 
266 int
267 spdk_bdev_set_opts(struct spdk_bdev_opts *opts)
268 {
269 	uint32_t min_pool_size;
270 
271 	/*
272 	 * Add 1 to the thread count to account for the extra mgmt_ch that gets created during subsystem
273 	 *  initialization.  A second mgmt_ch will be created on the same thread when the application starts
274 	 *  but before the deferred put_io_channel event is executed for the first mgmt_ch.
275 	 */
276 	min_pool_size = opts->bdev_io_cache_size * (spdk_thread_get_count() + 1);
277 	if (opts->bdev_io_pool_size < min_pool_size) {
278 		SPDK_ERRLOG("bdev_io_pool_size %" PRIu32 " is not compatible with bdev_io_cache_size %" PRIu32
279 			    " and %" PRIu32 " threads\n", opts->bdev_io_pool_size, opts->bdev_io_cache_size,
280 			    spdk_thread_get_count());
281 		SPDK_ERRLOG("bdev_io_pool_size must be at least %" PRIu32 "\n", min_pool_size);
282 		return -1;
283 	}
284 
285 	g_bdev_opts = *opts;
286 	return 0;
287 }
288 
289 struct spdk_bdev *
290 spdk_bdev_first(void)
291 {
292 	struct spdk_bdev *bdev;
293 
294 	bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs);
295 	if (bdev) {
296 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name);
297 	}
298 
299 	return bdev;
300 }
301 
302 struct spdk_bdev *
303 spdk_bdev_next(struct spdk_bdev *prev)
304 {
305 	struct spdk_bdev *bdev;
306 
307 	bdev = TAILQ_NEXT(prev, internal.link);
308 	if (bdev) {
309 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
310 	}
311 
312 	return bdev;
313 }
314 
315 static struct spdk_bdev *
316 _bdev_next_leaf(struct spdk_bdev *bdev)
317 {
318 	while (bdev != NULL) {
319 		if (bdev->internal.claim_module == NULL) {
320 			return bdev;
321 		} else {
322 			bdev = TAILQ_NEXT(bdev, internal.link);
323 		}
324 	}
325 
326 	return bdev;
327 }
328 
329 struct spdk_bdev *
330 spdk_bdev_first_leaf(void)
331 {
332 	struct spdk_bdev *bdev;
333 
334 	bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs));
335 
336 	if (bdev) {
337 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name);
338 	}
339 
340 	return bdev;
341 }
342 
343 struct spdk_bdev *
344 spdk_bdev_next_leaf(struct spdk_bdev *prev)
345 {
346 	struct spdk_bdev *bdev;
347 
348 	bdev = _bdev_next_leaf(TAILQ_NEXT(prev, internal.link));
349 
350 	if (bdev) {
351 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
352 	}
353 
354 	return bdev;
355 }
356 
357 struct spdk_bdev *
358 spdk_bdev_get_by_name(const char *bdev_name)
359 {
360 	struct spdk_bdev_alias *tmp;
361 	struct spdk_bdev *bdev = spdk_bdev_first();
362 
363 	while (bdev != NULL) {
364 		if (strcmp(bdev_name, bdev->name) == 0) {
365 			return bdev;
366 		}
367 
368 		TAILQ_FOREACH(tmp, &bdev->aliases, tailq) {
369 			if (strcmp(bdev_name, tmp->alias) == 0) {
370 				return bdev;
371 			}
372 		}
373 
374 		bdev = spdk_bdev_next(bdev);
375 	}
376 
377 	return NULL;
378 }
379 
380 size_t
381 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len)
382 {
383 	struct iovec **iovs;
384 	int *iovcnt;
385 	void *aligned_buf;
386 
387 	iovs = &bdev_io->u.bdev.iovs;
388 	iovcnt = &bdev_io->u.bdev.iovcnt;
389 
390 	if (*iovs == NULL || *iovcnt == 0) {
391 		*iovs = &bdev_io->iov;
392 		*iovcnt = 1;
393 	}
394 
395 	if (buf != NULL) {
396 		aligned_buf = (void *)(((uintptr_t)buf + 512) & ~511UL);
397 		len = len - ((uintptr_t)aligned_buf - (uintptr_t)buf);
398 	} else {
399 		aligned_buf = NULL;
400 		assert(len == 0);
401 	}
402 
403 	(*iovs)[0].iov_base = aligned_buf;
404 	(*iovs)[0].iov_len = len;
405 
406 	return len;
407 }
408 
409 static void
410 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io)
411 {
412 	struct spdk_mempool *pool;
413 	struct spdk_bdev_io *tmp;
414 	void *buf;
415 	bdev_io_stailq_t *stailq;
416 	struct spdk_bdev_mgmt_channel *ch;
417 	size_t len;
418 
419 	assert(bdev_io->u.bdev.iovcnt == 1);
420 
421 	buf = bdev_io->internal.buf;
422 	ch = bdev_io->internal.ch->shared_resource->mgmt_ch;
423 
424 	if (bdev_io->internal.buf_len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
425 		pool = g_bdev_mgr.buf_small_pool;
426 		stailq = &ch->need_buf_small;
427 		len = SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512;
428 	} else {
429 		pool = g_bdev_mgr.buf_large_pool;
430 		stailq = &ch->need_buf_large;
431 		len = SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512;
432 	}
433 
434 	if (STAILQ_EMPTY(stailq)) {
435 		spdk_mempool_put(pool, buf);
436 	} else {
437 		tmp = STAILQ_FIRST(stailq);
438 		STAILQ_REMOVE_HEAD(stailq, internal.buf_link);
439 		len = spdk_bdev_io_set_buf(tmp, buf, len);
440 		if (len < tmp->internal.buf_len) {
441 			SPDK_ERRLOG("Unable to use buffer due to alignment\n");
442 			spdk_mempool_put(pool, buf);
443 			spdk_bdev_io_set_buf(tmp, NULL, 0);
444 			return;
445 		}
446 		tmp->internal.buf = buf;
447 		tmp->internal.get_buf_cb(tmp->internal.ch->channel, tmp);
448 	}
449 }
450 
451 void
452 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len)
453 {
454 	struct spdk_mempool *pool;
455 	bdev_io_stailq_t *stailq;
456 	void *buf = NULL;
457 	struct spdk_bdev_mgmt_channel *mgmt_ch;
458 	size_t buf_len;
459 
460 	assert(cb != NULL);
461 	assert(bdev_io->u.bdev.iovs != NULL);
462 
463 	if (spdk_unlikely(bdev_io->u.bdev.iovs[0].iov_base != NULL)) {
464 		/* Buffer already present */
465 		cb(bdev_io->internal.ch->channel, bdev_io);
466 		return;
467 	}
468 
469 	assert(len <= SPDK_BDEV_LARGE_BUF_MAX_SIZE);
470 	mgmt_ch = bdev_io->internal.ch->shared_resource->mgmt_ch;
471 
472 	bdev_io->internal.buf_len = len;
473 	bdev_io->internal.get_buf_cb = cb;
474 	if (len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
475 		pool = g_bdev_mgr.buf_small_pool;
476 		stailq = &mgmt_ch->need_buf_small;
477 		buf_len = SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512;
478 	} else {
479 		pool = g_bdev_mgr.buf_large_pool;
480 		stailq = &mgmt_ch->need_buf_large;
481 		buf_len = SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512;
482 	}
483 
484 	buf = spdk_mempool_get(pool);
485 
486 	if (!buf) {
487 		STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link);
488 	} else {
489 		size_t aligned_len;
490 
491 		aligned_len = spdk_bdev_io_set_buf(bdev_io, buf, buf_len);
492 		if (aligned_len < len) {
493 			SPDK_ERRLOG("Unable to use buffer after alignment calculations.\n");
494 			spdk_mempool_put(pool, buf);
495 			spdk_bdev_io_set_buf(bdev_io, NULL, 0);
496 			STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link);
497 			return;
498 		}
499 
500 		bdev_io->internal.buf = buf;
501 		bdev_io->internal.get_buf_cb(bdev_io->internal.ch->channel, bdev_io);
502 	}
503 }
504 
505 static int
506 spdk_bdev_module_get_max_ctx_size(void)
507 {
508 	struct spdk_bdev_module *bdev_module;
509 	int max_bdev_module_size = 0;
510 
511 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) {
512 		if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) {
513 			max_bdev_module_size = bdev_module->get_ctx_size();
514 		}
515 	}
516 
517 	return max_bdev_module_size;
518 }
519 
520 void
521 spdk_bdev_config_text(FILE *fp)
522 {
523 	struct spdk_bdev_module *bdev_module;
524 
525 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) {
526 		if (bdev_module->config_text) {
527 			bdev_module->config_text(fp);
528 		}
529 	}
530 }
531 
532 void
533 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w)
534 {
535 	struct spdk_bdev_module *bdev_module;
536 	struct spdk_bdev *bdev;
537 
538 	assert(w != NULL);
539 
540 	spdk_json_write_array_begin(w);
541 
542 	spdk_json_write_object_begin(w);
543 	spdk_json_write_named_string(w, "method", "set_bdev_options");
544 	spdk_json_write_name(w, "params");
545 	spdk_json_write_object_begin(w);
546 	spdk_json_write_named_uint32(w, "bdev_io_pool_size", g_bdev_opts.bdev_io_pool_size);
547 	spdk_json_write_named_uint32(w, "bdev_io_cache_size", g_bdev_opts.bdev_io_cache_size);
548 	spdk_json_write_object_end(w);
549 	spdk_json_write_object_end(w);
550 
551 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) {
552 		if (bdev_module->config_json) {
553 			bdev_module->config_json(w);
554 		}
555 	}
556 
557 	TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, internal.link) {
558 		spdk_bdev_config_json(bdev, w);
559 	}
560 
561 	spdk_json_write_array_end(w);
562 }
563 
564 static int
565 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf)
566 {
567 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
568 	struct spdk_bdev_io *bdev_io;
569 	uint32_t i;
570 
571 	STAILQ_INIT(&ch->need_buf_small);
572 	STAILQ_INIT(&ch->need_buf_large);
573 
574 	STAILQ_INIT(&ch->per_thread_cache);
575 	ch->bdev_io_cache_size = g_bdev_opts.bdev_io_cache_size;
576 
577 	/* Pre-populate bdev_io cache to ensure this thread cannot be starved. */
578 	ch->per_thread_cache_count = 0;
579 	for (i = 0; i < ch->bdev_io_cache_size; i++) {
580 		bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool);
581 		assert(bdev_io != NULL);
582 		ch->per_thread_cache_count++;
583 		STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link);
584 	}
585 
586 	TAILQ_INIT(&ch->shared_resources);
587 	TAILQ_INIT(&ch->io_wait_queue);
588 
589 	return 0;
590 }
591 
592 static void
593 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf)
594 {
595 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
596 	struct spdk_bdev_io *bdev_io;
597 
598 	if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) {
599 		SPDK_ERRLOG("Pending I/O list wasn't empty on mgmt channel free\n");
600 	}
601 
602 	if (!TAILQ_EMPTY(&ch->shared_resources)) {
603 		SPDK_ERRLOG("Module channel list wasn't empty on mgmt channel free\n");
604 	}
605 
606 	while (!STAILQ_EMPTY(&ch->per_thread_cache)) {
607 		bdev_io = STAILQ_FIRST(&ch->per_thread_cache);
608 		STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link);
609 		ch->per_thread_cache_count--;
610 		spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
611 	}
612 
613 	assert(ch->per_thread_cache_count == 0);
614 }
615 
616 static void
617 spdk_bdev_init_complete(int rc)
618 {
619 	spdk_bdev_init_cb cb_fn = g_init_cb_fn;
620 	void *cb_arg = g_init_cb_arg;
621 	struct spdk_bdev_module *m;
622 
623 	g_bdev_mgr.init_complete = true;
624 	g_init_cb_fn = NULL;
625 	g_init_cb_arg = NULL;
626 
627 	/*
628 	 * For modules that need to know when subsystem init is complete,
629 	 * inform them now.
630 	 */
631 	TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) {
632 		if (m->init_complete) {
633 			m->init_complete();
634 		}
635 	}
636 
637 	cb_fn(cb_arg, rc);
638 }
639 
640 static void
641 spdk_bdev_module_action_complete(void)
642 {
643 	struct spdk_bdev_module *m;
644 
645 	/*
646 	 * Don't finish bdev subsystem initialization if
647 	 * module pre-initialization is still in progress, or
648 	 * the subsystem been already initialized.
649 	 */
650 	if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) {
651 		return;
652 	}
653 
654 	/*
655 	 * Check all bdev modules for inits/examinations in progress. If any
656 	 * exist, return immediately since we cannot finish bdev subsystem
657 	 * initialization until all are completed.
658 	 */
659 	TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) {
660 		if (m->internal.action_in_progress > 0) {
661 			return;
662 		}
663 	}
664 
665 	/*
666 	 * Modules already finished initialization - now that all
667 	 * the bdev modules have finished their asynchronous I/O
668 	 * processing, the entire bdev layer can be marked as complete.
669 	 */
670 	spdk_bdev_init_complete(0);
671 }
672 
673 static void
674 spdk_bdev_module_action_done(struct spdk_bdev_module *module)
675 {
676 	assert(module->internal.action_in_progress > 0);
677 	module->internal.action_in_progress--;
678 	spdk_bdev_module_action_complete();
679 }
680 
681 void
682 spdk_bdev_module_init_done(struct spdk_bdev_module *module)
683 {
684 	spdk_bdev_module_action_done(module);
685 }
686 
687 void
688 spdk_bdev_module_examine_done(struct spdk_bdev_module *module)
689 {
690 	spdk_bdev_module_action_done(module);
691 }
692 
693 static int
694 spdk_bdev_modules_init(void)
695 {
696 	struct spdk_bdev_module *module;
697 	int rc = 0;
698 
699 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) {
700 		rc = module->module_init();
701 		if (rc != 0) {
702 			break;
703 		}
704 	}
705 
706 	g_bdev_mgr.module_init_complete = true;
707 	return rc;
708 }
709 
710 void
711 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg)
712 {
713 	struct spdk_conf_section *sp;
714 	struct spdk_bdev_opts bdev_opts;
715 	int32_t bdev_io_pool_size, bdev_io_cache_size;
716 	int cache_size;
717 	int rc = 0;
718 	char mempool_name[32];
719 
720 	assert(cb_fn != NULL);
721 
722 	sp = spdk_conf_find_section(NULL, "Bdev");
723 	if (sp != NULL) {
724 		spdk_bdev_get_opts(&bdev_opts);
725 
726 		bdev_io_pool_size = spdk_conf_section_get_intval(sp, "BdevIoPoolSize");
727 		if (bdev_io_pool_size >= 0) {
728 			bdev_opts.bdev_io_pool_size = bdev_io_pool_size;
729 		}
730 
731 		bdev_io_cache_size = spdk_conf_section_get_intval(sp, "BdevIoCacheSize");
732 		if (bdev_io_cache_size >= 0) {
733 			bdev_opts.bdev_io_cache_size = bdev_io_cache_size;
734 		}
735 
736 		if (spdk_bdev_set_opts(&bdev_opts)) {
737 			spdk_bdev_init_complete(-1);
738 			return;
739 		}
740 
741 		assert(memcmp(&bdev_opts, &g_bdev_opts, sizeof(bdev_opts)) == 0);
742 	}
743 
744 	g_init_cb_fn = cb_fn;
745 	g_init_cb_arg = cb_arg;
746 
747 	snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid());
748 
749 	g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name,
750 				  g_bdev_opts.bdev_io_pool_size,
751 				  sizeof(struct spdk_bdev_io) +
752 				  spdk_bdev_module_get_max_ctx_size(),
753 				  0,
754 				  SPDK_ENV_SOCKET_ID_ANY);
755 
756 	if (g_bdev_mgr.bdev_io_pool == NULL) {
757 		SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n");
758 		spdk_bdev_init_complete(-1);
759 		return;
760 	}
761 
762 	/**
763 	 * Ensure no more than half of the total buffers end up local caches, by
764 	 *   using spdk_thread_get_count() to determine how many local caches we need
765 	 *   to account for.
766 	 */
767 	cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_thread_get_count());
768 	snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid());
769 
770 	g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name,
771 				    BUF_SMALL_POOL_SIZE,
772 				    SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512,
773 				    cache_size,
774 				    SPDK_ENV_SOCKET_ID_ANY);
775 	if (!g_bdev_mgr.buf_small_pool) {
776 		SPDK_ERRLOG("create rbuf small pool failed\n");
777 		spdk_bdev_init_complete(-1);
778 		return;
779 	}
780 
781 	cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_thread_get_count());
782 	snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid());
783 
784 	g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name,
785 				    BUF_LARGE_POOL_SIZE,
786 				    SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512,
787 				    cache_size,
788 				    SPDK_ENV_SOCKET_ID_ANY);
789 	if (!g_bdev_mgr.buf_large_pool) {
790 		SPDK_ERRLOG("create rbuf large pool failed\n");
791 		spdk_bdev_init_complete(-1);
792 		return;
793 	}
794 
795 	g_bdev_mgr.zero_buffer = spdk_dma_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE,
796 				 NULL);
797 	if (!g_bdev_mgr.zero_buffer) {
798 		SPDK_ERRLOG("create bdev zero buffer failed\n");
799 		spdk_bdev_init_complete(-1);
800 		return;
801 	}
802 
803 #ifdef SPDK_CONFIG_VTUNE
804 	g_bdev_mgr.domain = __itt_domain_create("spdk_bdev");
805 #endif
806 
807 	spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create,
808 				spdk_bdev_mgmt_channel_destroy,
809 				sizeof(struct spdk_bdev_mgmt_channel));
810 
811 	rc = spdk_bdev_modules_init();
812 	if (rc != 0) {
813 		SPDK_ERRLOG("bdev modules init failed\n");
814 		spdk_bdev_init_complete(-1);
815 		return;
816 	}
817 
818 	spdk_bdev_module_action_complete();
819 }
820 
821 static void
822 spdk_bdev_mgr_unregister_cb(void *io_device)
823 {
824 	spdk_bdev_fini_cb cb_fn = g_fini_cb_fn;
825 
826 	if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != g_bdev_opts.bdev_io_pool_size) {
827 		SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n",
828 			    spdk_mempool_count(g_bdev_mgr.bdev_io_pool),
829 			    g_bdev_opts.bdev_io_pool_size);
830 	}
831 
832 	if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) {
833 		SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n",
834 			    spdk_mempool_count(g_bdev_mgr.buf_small_pool),
835 			    BUF_SMALL_POOL_SIZE);
836 		assert(false);
837 	}
838 
839 	if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) {
840 		SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n",
841 			    spdk_mempool_count(g_bdev_mgr.buf_large_pool),
842 			    BUF_LARGE_POOL_SIZE);
843 		assert(false);
844 	}
845 
846 	spdk_mempool_free(g_bdev_mgr.bdev_io_pool);
847 	spdk_mempool_free(g_bdev_mgr.buf_small_pool);
848 	spdk_mempool_free(g_bdev_mgr.buf_large_pool);
849 	spdk_dma_free(g_bdev_mgr.zero_buffer);
850 
851 	cb_fn(g_fini_cb_arg);
852 	g_fini_cb_fn = NULL;
853 	g_fini_cb_arg = NULL;
854 }
855 
856 static struct spdk_bdev_module *g_resume_bdev_module = NULL;
857 
858 static void
859 spdk_bdev_module_finish_iter(void *arg)
860 {
861 	struct spdk_bdev_module *bdev_module;
862 
863 	/* Start iterating from the last touched module */
864 	if (!g_resume_bdev_module) {
865 		bdev_module = TAILQ_FIRST(&g_bdev_mgr.bdev_modules);
866 	} else {
867 		bdev_module = TAILQ_NEXT(g_resume_bdev_module, internal.tailq);
868 	}
869 
870 	while (bdev_module) {
871 		if (bdev_module->async_fini) {
872 			/* Save our place so we can resume later. We must
873 			 * save the variable here, before calling module_fini()
874 			 * below, because in some cases the module may immediately
875 			 * call spdk_bdev_module_finish_done() and re-enter
876 			 * this function to continue iterating. */
877 			g_resume_bdev_module = bdev_module;
878 		}
879 
880 		if (bdev_module->module_fini) {
881 			bdev_module->module_fini();
882 		}
883 
884 		if (bdev_module->async_fini) {
885 			return;
886 		}
887 
888 		bdev_module = TAILQ_NEXT(bdev_module, internal.tailq);
889 	}
890 
891 	g_resume_bdev_module = NULL;
892 	spdk_io_device_unregister(&g_bdev_mgr, spdk_bdev_mgr_unregister_cb);
893 }
894 
895 void
896 spdk_bdev_module_finish_done(void)
897 {
898 	if (spdk_get_thread() != g_fini_thread) {
899 		spdk_thread_send_msg(g_fini_thread, spdk_bdev_module_finish_iter, NULL);
900 	} else {
901 		spdk_bdev_module_finish_iter(NULL);
902 	}
903 }
904 
905 static void
906 _spdk_bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno)
907 {
908 	struct spdk_bdev *bdev = cb_arg;
909 
910 	if (bdeverrno && bdev) {
911 		SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n",
912 			     bdev->name);
913 
914 		/*
915 		 * Since the call to spdk_bdev_unregister() failed, we have no way to free this
916 		 *  bdev; try to continue by manually removing this bdev from the list and continue
917 		 *  with the next bdev in the list.
918 		 */
919 		TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link);
920 	}
921 
922 	if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) {
923 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n");
924 		/*
925 		 * Bdev module finish need to be deffered as we might be in the middle of some context
926 		 * (like bdev part free) that will use this bdev (or private bdev driver ctx data)
927 		 * after returning.
928 		 */
929 		spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_module_finish_iter, NULL);
930 		return;
931 	}
932 
933 	/*
934 	 * Unregister the first bdev in the list.
935 	 *
936 	 * spdk_bdev_unregister() will handle the case where the bdev has open descriptors by
937 	 *  calling the remove_cb of the descriptors first.
938 	 *
939 	 * Once this bdev and all of its open descriptors have been cleaned up, this function
940 	 *  will be called again via the unregister completion callback to continue the cleanup
941 	 *  process with the next bdev.
942 	 */
943 	bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs);
944 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name);
945 	spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev);
946 }
947 
948 void
949 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg)
950 {
951 	assert(cb_fn != NULL);
952 
953 	g_fini_thread = spdk_get_thread();
954 
955 	g_fini_cb_fn = cb_fn;
956 	g_fini_cb_arg = cb_arg;
957 
958 	_spdk_bdev_finish_unregister_bdevs_iter(NULL, 0);
959 }
960 
961 static struct spdk_bdev_io *
962 spdk_bdev_get_io(struct spdk_bdev_channel *channel)
963 {
964 	struct spdk_bdev_mgmt_channel *ch = channel->shared_resource->mgmt_ch;
965 	struct spdk_bdev_io *bdev_io;
966 
967 	if (ch->per_thread_cache_count > 0) {
968 		bdev_io = STAILQ_FIRST(&ch->per_thread_cache);
969 		STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link);
970 		ch->per_thread_cache_count--;
971 	} else if (spdk_unlikely(!TAILQ_EMPTY(&ch->io_wait_queue))) {
972 		/*
973 		 * Don't try to look for bdev_ios in the global pool if there are
974 		 * waiters on bdev_ios - we don't want this caller to jump the line.
975 		 */
976 		bdev_io = NULL;
977 	} else {
978 		bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool);
979 	}
980 
981 	return bdev_io;
982 }
983 
984 void
985 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io)
986 {
987 	struct spdk_bdev_mgmt_channel *ch = bdev_io->internal.ch->shared_resource->mgmt_ch;
988 
989 	assert(bdev_io != NULL);
990 	assert(bdev_io->internal.status != SPDK_BDEV_IO_STATUS_PENDING);
991 
992 	if (bdev_io->internal.buf != NULL) {
993 		spdk_bdev_io_put_buf(bdev_io);
994 	}
995 
996 	if (ch->per_thread_cache_count < ch->bdev_io_cache_size) {
997 		ch->per_thread_cache_count++;
998 		STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link);
999 		while (ch->per_thread_cache_count > 0 && !TAILQ_EMPTY(&ch->io_wait_queue)) {
1000 			struct spdk_bdev_io_wait_entry *entry;
1001 
1002 			entry = TAILQ_FIRST(&ch->io_wait_queue);
1003 			TAILQ_REMOVE(&ch->io_wait_queue, entry, link);
1004 			entry->cb_fn(entry->cb_arg);
1005 		}
1006 	} else {
1007 		/* We should never have a full cache with entries on the io wait queue. */
1008 		assert(TAILQ_EMPTY(&ch->io_wait_queue));
1009 		spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
1010 	}
1011 }
1012 
1013 static uint64_t
1014 _spdk_bdev_get_io_size_in_byte(struct spdk_bdev_io *bdev_io)
1015 {
1016 	struct spdk_bdev	*bdev = bdev_io->bdev;
1017 
1018 	switch (bdev_io->type) {
1019 	case SPDK_BDEV_IO_TYPE_NVME_ADMIN:
1020 	case SPDK_BDEV_IO_TYPE_NVME_IO:
1021 	case SPDK_BDEV_IO_TYPE_NVME_IO_MD:
1022 		return bdev_io->u.nvme_passthru.nbytes;
1023 	case SPDK_BDEV_IO_TYPE_READ:
1024 	case SPDK_BDEV_IO_TYPE_WRITE:
1025 	case SPDK_BDEV_IO_TYPE_UNMAP:
1026 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
1027 		return bdev_io->u.bdev.num_blocks * bdev->blocklen;
1028 	default:
1029 		return 0;
1030 	}
1031 }
1032 
1033 static void
1034 _spdk_bdev_qos_io_submit(struct spdk_bdev_channel *ch)
1035 {
1036 	struct spdk_bdev_io		*bdev_io = NULL;
1037 	struct spdk_bdev		*bdev = ch->bdev;
1038 	struct spdk_bdev_qos		*qos = bdev->internal.qos;
1039 	struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource;
1040 
1041 	while (!TAILQ_EMPTY(&qos->queued)) {
1042 		if (qos->max_ios_per_timeslice > 0 &&
1043 		    qos->io_submitted_this_timeslice >= qos->max_ios_per_timeslice) {
1044 			break;
1045 		}
1046 
1047 		if (qos->max_byte_per_timeslice > 0 &&
1048 		    qos->byte_submitted_this_timeslice >= qos->max_byte_per_timeslice) {
1049 			break;
1050 		}
1051 
1052 		bdev_io = TAILQ_FIRST(&qos->queued);
1053 		TAILQ_REMOVE(&qos->queued, bdev_io, internal.link);
1054 		qos->io_submitted_this_timeslice++;
1055 		qos->byte_submitted_this_timeslice += _spdk_bdev_get_io_size_in_byte(bdev_io);
1056 		ch->io_outstanding++;
1057 		shared_resource->io_outstanding++;
1058 		bdev->fn_table->submit_request(ch->channel, bdev_io);
1059 	}
1060 }
1061 
1062 static void
1063 _spdk_bdev_io_submit(void *ctx)
1064 {
1065 	struct spdk_bdev_io *bdev_io = ctx;
1066 	struct spdk_bdev *bdev = bdev_io->bdev;
1067 	struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch;
1068 	struct spdk_io_channel *ch = bdev_ch->channel;
1069 	struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource;
1070 
1071 	bdev_io->internal.submit_tsc = spdk_get_ticks();
1072 	bdev_ch->io_outstanding++;
1073 	shared_resource->io_outstanding++;
1074 	bdev_io->internal.in_submit_request = true;
1075 	if (spdk_likely(bdev_ch->flags == 0)) {
1076 		if (spdk_likely(TAILQ_EMPTY(&shared_resource->nomem_io))) {
1077 			bdev->fn_table->submit_request(ch, bdev_io);
1078 		} else {
1079 			bdev_ch->io_outstanding--;
1080 			shared_resource->io_outstanding--;
1081 			TAILQ_INSERT_TAIL(&shared_resource->nomem_io, bdev_io, internal.link);
1082 		}
1083 	} else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) {
1084 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1085 	} else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) {
1086 		bdev_ch->io_outstanding--;
1087 		shared_resource->io_outstanding--;
1088 		TAILQ_INSERT_TAIL(&bdev->internal.qos->queued, bdev_io, internal.link);
1089 		_spdk_bdev_qos_io_submit(bdev_ch);
1090 	} else {
1091 		SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags);
1092 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1093 	}
1094 	bdev_io->internal.in_submit_request = false;
1095 }
1096 
1097 static void
1098 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io)
1099 {
1100 	struct spdk_bdev *bdev = bdev_io->bdev;
1101 	struct spdk_thread *thread = spdk_io_channel_get_thread(bdev_io->internal.ch->channel);
1102 
1103 	assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING);
1104 
1105 	if (bdev_io->internal.ch->flags & BDEV_CH_QOS_ENABLED) {
1106 		if (thread == bdev->internal.qos->thread) {
1107 			_spdk_bdev_io_submit(bdev_io);
1108 		} else {
1109 			bdev_io->internal.io_submit_ch = bdev_io->internal.ch;
1110 			bdev_io->internal.ch = bdev->internal.qos->ch;
1111 			spdk_thread_send_msg(bdev->internal.qos->thread, _spdk_bdev_io_submit, bdev_io);
1112 		}
1113 	} else {
1114 		_spdk_bdev_io_submit(bdev_io);
1115 	}
1116 }
1117 
1118 static void
1119 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io)
1120 {
1121 	struct spdk_bdev *bdev = bdev_io->bdev;
1122 	struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch;
1123 	struct spdk_io_channel *ch = bdev_ch->channel;
1124 
1125 	assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING);
1126 
1127 	bdev_io->internal.in_submit_request = true;
1128 	bdev->fn_table->submit_request(ch, bdev_io);
1129 	bdev_io->internal.in_submit_request = false;
1130 }
1131 
1132 static void
1133 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io,
1134 		  struct spdk_bdev *bdev, void *cb_arg,
1135 		  spdk_bdev_io_completion_cb cb)
1136 {
1137 	bdev_io->bdev = bdev;
1138 	bdev_io->internal.caller_ctx = cb_arg;
1139 	bdev_io->internal.cb = cb;
1140 	bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING;
1141 	bdev_io->internal.in_submit_request = false;
1142 	bdev_io->internal.buf = NULL;
1143 	bdev_io->internal.io_submit_ch = NULL;
1144 }
1145 
1146 static bool
1147 _spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type)
1148 {
1149 	return bdev->fn_table->io_type_supported(bdev->ctxt, io_type);
1150 }
1151 
1152 bool
1153 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type)
1154 {
1155 	bool supported;
1156 
1157 	supported = _spdk_bdev_io_type_supported(bdev, io_type);
1158 
1159 	if (!supported) {
1160 		switch (io_type) {
1161 		case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
1162 			/* The bdev layer will emulate write zeroes as long as write is supported. */
1163 			supported = _spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE);
1164 			break;
1165 		default:
1166 			break;
1167 		}
1168 	}
1169 
1170 	return supported;
1171 }
1172 
1173 int
1174 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
1175 {
1176 	if (bdev->fn_table->dump_info_json) {
1177 		return bdev->fn_table->dump_info_json(bdev->ctxt, w);
1178 	}
1179 
1180 	return 0;
1181 }
1182 
1183 void
1184 spdk_bdev_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
1185 {
1186 	assert(bdev != NULL);
1187 	assert(w != NULL);
1188 
1189 	if (bdev->fn_table->write_config_json) {
1190 		bdev->fn_table->write_config_json(bdev, w);
1191 	} else {
1192 		spdk_json_write_object_begin(w);
1193 		spdk_json_write_named_string(w, "name", bdev->name);
1194 		spdk_json_write_object_end(w);
1195 	}
1196 }
1197 
1198 static void
1199 spdk_bdev_qos_update_max_quota_per_timeslice(struct spdk_bdev_qos *qos)
1200 {
1201 	uint64_t max_ios_per_timeslice = 0, max_byte_per_timeslice = 0;
1202 
1203 	if (qos->iops_rate_limit > 0) {
1204 		max_ios_per_timeslice = qos->iops_rate_limit * SPDK_BDEV_QOS_TIMESLICE_IN_USEC /
1205 					SPDK_BDEV_SEC_TO_USEC;
1206 		qos->max_ios_per_timeslice = spdk_max(max_ios_per_timeslice,
1207 						      SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE);
1208 	}
1209 
1210 	if (qos->byte_rate_limit > 0) {
1211 		max_byte_per_timeslice = qos->byte_rate_limit * SPDK_BDEV_QOS_TIMESLICE_IN_USEC /
1212 					 SPDK_BDEV_SEC_TO_USEC;
1213 		qos->max_byte_per_timeslice = spdk_max(max_byte_per_timeslice,
1214 						       SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE);
1215 	}
1216 }
1217 
1218 static int
1219 spdk_bdev_channel_poll_qos(void *arg)
1220 {
1221 	struct spdk_bdev_qos *qos = arg;
1222 
1223 	/* Reset for next round of rate limiting */
1224 	qos->io_submitted_this_timeslice = 0;
1225 	qos->byte_submitted_this_timeslice = 0;
1226 
1227 	_spdk_bdev_qos_io_submit(qos->ch);
1228 
1229 	return -1;
1230 }
1231 
1232 static void
1233 _spdk_bdev_channel_destroy_resource(struct spdk_bdev_channel *ch)
1234 {
1235 	struct spdk_bdev_shared_resource *shared_resource;
1236 
1237 	if (!ch) {
1238 		return;
1239 	}
1240 
1241 	if (ch->channel) {
1242 		spdk_put_io_channel(ch->channel);
1243 	}
1244 
1245 	assert(ch->io_outstanding == 0);
1246 
1247 	shared_resource = ch->shared_resource;
1248 	if (shared_resource) {
1249 		assert(ch->io_outstanding == 0);
1250 		assert(shared_resource->ref > 0);
1251 		shared_resource->ref--;
1252 		if (shared_resource->ref == 0) {
1253 			assert(shared_resource->io_outstanding == 0);
1254 			TAILQ_REMOVE(&shared_resource->mgmt_ch->shared_resources, shared_resource, link);
1255 			spdk_put_io_channel(spdk_io_channel_from_ctx(shared_resource->mgmt_ch));
1256 			free(shared_resource);
1257 		}
1258 	}
1259 }
1260 
1261 /* Caller must hold bdev->internal.mutex. */
1262 static int
1263 _spdk_bdev_enable_qos(struct spdk_bdev *bdev, struct spdk_bdev_channel *ch)
1264 {
1265 	struct spdk_bdev_qos *qos = bdev->internal.qos;
1266 
1267 	/* Rate limiting on this bdev enabled */
1268 	if (qos) {
1269 		if (qos->ch == NULL) {
1270 			struct spdk_io_channel *io_ch;
1271 
1272 			SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Selecting channel %p as QoS channel for bdev %s on thread %p\n", ch,
1273 				      bdev->name, spdk_get_thread());
1274 
1275 			/* No qos channel has been selected, so set one up */
1276 
1277 			/* Take another reference to ch */
1278 			io_ch = spdk_get_io_channel(__bdev_to_io_dev(bdev));
1279 			qos->ch = ch;
1280 
1281 			qos->thread = spdk_io_channel_get_thread(io_ch);
1282 
1283 			TAILQ_INIT(&qos->queued);
1284 			spdk_bdev_qos_update_max_quota_per_timeslice(qos);
1285 			qos->io_submitted_this_timeslice = 0;
1286 			qos->byte_submitted_this_timeslice = 0;
1287 
1288 			qos->poller = spdk_poller_register(spdk_bdev_channel_poll_qos,
1289 							   qos,
1290 							   SPDK_BDEV_QOS_TIMESLICE_IN_USEC);
1291 		}
1292 
1293 		ch->flags |= BDEV_CH_QOS_ENABLED;
1294 	}
1295 
1296 	return 0;
1297 }
1298 
1299 static int
1300 spdk_bdev_channel_create(void *io_device, void *ctx_buf)
1301 {
1302 	struct spdk_bdev		*bdev = __bdev_from_io_dev(io_device);
1303 	struct spdk_bdev_channel	*ch = ctx_buf;
1304 	struct spdk_io_channel		*mgmt_io_ch;
1305 	struct spdk_bdev_mgmt_channel	*mgmt_ch;
1306 	struct spdk_bdev_shared_resource *shared_resource;
1307 
1308 	ch->bdev = bdev;
1309 	ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt);
1310 	if (!ch->channel) {
1311 		return -1;
1312 	}
1313 
1314 	mgmt_io_ch = spdk_get_io_channel(&g_bdev_mgr);
1315 	if (!mgmt_io_ch) {
1316 		return -1;
1317 	}
1318 
1319 	mgmt_ch = spdk_io_channel_get_ctx(mgmt_io_ch);
1320 	TAILQ_FOREACH(shared_resource, &mgmt_ch->shared_resources, link) {
1321 		if (shared_resource->shared_ch == ch->channel) {
1322 			spdk_put_io_channel(mgmt_io_ch);
1323 			shared_resource->ref++;
1324 			break;
1325 		}
1326 	}
1327 
1328 	if (shared_resource == NULL) {
1329 		shared_resource = calloc(1, sizeof(*shared_resource));
1330 		if (shared_resource == NULL) {
1331 			spdk_put_io_channel(mgmt_io_ch);
1332 			return -1;
1333 		}
1334 
1335 		shared_resource->mgmt_ch = mgmt_ch;
1336 		shared_resource->io_outstanding = 0;
1337 		TAILQ_INIT(&shared_resource->nomem_io);
1338 		shared_resource->nomem_threshold = 0;
1339 		shared_resource->shared_ch = ch->channel;
1340 		shared_resource->ref = 1;
1341 		TAILQ_INSERT_TAIL(&mgmt_ch->shared_resources, shared_resource, link);
1342 	}
1343 
1344 	memset(&ch->stat, 0, sizeof(ch->stat));
1345 	ch->stat.ticks_rate = spdk_get_ticks_hz();
1346 	ch->io_outstanding = 0;
1347 	TAILQ_INIT(&ch->queued_resets);
1348 	ch->flags = 0;
1349 	ch->shared_resource = shared_resource;
1350 
1351 #ifdef SPDK_CONFIG_VTUNE
1352 	{
1353 		char *name;
1354 		__itt_init_ittlib(NULL, 0);
1355 		name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch);
1356 		if (!name) {
1357 			_spdk_bdev_channel_destroy_resource(ch);
1358 			return -1;
1359 		}
1360 		ch->handle = __itt_string_handle_create(name);
1361 		free(name);
1362 		ch->start_tsc = spdk_get_ticks();
1363 		ch->interval_tsc = spdk_get_ticks_hz() / 100;
1364 		memset(&ch->prev_stat, 0, sizeof(ch->prev_stat));
1365 	}
1366 #endif
1367 
1368 	pthread_mutex_lock(&bdev->internal.mutex);
1369 
1370 	if (_spdk_bdev_enable_qos(bdev, ch)) {
1371 		_spdk_bdev_channel_destroy_resource(ch);
1372 		pthread_mutex_unlock(&bdev->internal.mutex);
1373 		return -1;
1374 	}
1375 
1376 	pthread_mutex_unlock(&bdev->internal.mutex);
1377 
1378 	return 0;
1379 }
1380 
1381 /*
1382  * Abort I/O that are waiting on a data buffer.  These types of I/O are
1383  *  linked using the spdk_bdev_io internal.buf_link TAILQ_ENTRY.
1384  */
1385 static void
1386 _spdk_bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch)
1387 {
1388 	bdev_io_stailq_t tmp;
1389 	struct spdk_bdev_io *bdev_io;
1390 
1391 	STAILQ_INIT(&tmp);
1392 
1393 	while (!STAILQ_EMPTY(queue)) {
1394 		bdev_io = STAILQ_FIRST(queue);
1395 		STAILQ_REMOVE_HEAD(queue, internal.buf_link);
1396 		if (bdev_io->internal.ch == ch) {
1397 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1398 		} else {
1399 			STAILQ_INSERT_TAIL(&tmp, bdev_io, internal.buf_link);
1400 		}
1401 	}
1402 
1403 	STAILQ_SWAP(&tmp, queue, spdk_bdev_io);
1404 }
1405 
1406 /*
1407  * Abort I/O that are queued waiting for submission.  These types of I/O are
1408  *  linked using the spdk_bdev_io link TAILQ_ENTRY.
1409  */
1410 static void
1411 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch)
1412 {
1413 	struct spdk_bdev_io *bdev_io, *tmp;
1414 
1415 	TAILQ_FOREACH_SAFE(bdev_io, queue, internal.link, tmp) {
1416 		if (bdev_io->internal.ch == ch) {
1417 			TAILQ_REMOVE(queue, bdev_io, internal.link);
1418 			/*
1419 			 * spdk_bdev_io_complete() assumes that the completed I/O had
1420 			 *  been submitted to the bdev module.  Since in this case it
1421 			 *  hadn't, bump io_outstanding to account for the decrement
1422 			 *  that spdk_bdev_io_complete() will do.
1423 			 */
1424 			if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) {
1425 				ch->io_outstanding++;
1426 				ch->shared_resource->io_outstanding++;
1427 			}
1428 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1429 		}
1430 	}
1431 }
1432 
1433 static void
1434 spdk_bdev_qos_channel_destroy(void *cb_arg)
1435 {
1436 	struct spdk_bdev_qos *qos = cb_arg;
1437 
1438 	spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch));
1439 	spdk_poller_unregister(&qos->poller);
1440 
1441 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Free QoS %p.\n", qos);
1442 
1443 	free(qos);
1444 }
1445 
1446 static int
1447 spdk_bdev_qos_destroy(struct spdk_bdev *bdev)
1448 {
1449 	/*
1450 	 * Cleanly shutting down the QoS poller is tricky, because
1451 	 * during the asynchronous operation the user could open
1452 	 * a new descriptor and create a new channel, spawning
1453 	 * a new QoS poller.
1454 	 *
1455 	 * The strategy is to create a new QoS structure here and swap it
1456 	 * in. The shutdown path then continues to refer to the old one
1457 	 * until it completes and then releases it.
1458 	 */
1459 	struct spdk_bdev_qos *new_qos, *old_qos;
1460 
1461 	old_qos = bdev->internal.qos;
1462 
1463 	new_qos = calloc(1, sizeof(*new_qos));
1464 	if (!new_qos) {
1465 		SPDK_ERRLOG("Unable to allocate memory to shut down QoS.\n");
1466 		return -ENOMEM;
1467 	}
1468 
1469 	/* Copy the old QoS data into the newly allocated structure */
1470 	memcpy(new_qos, old_qos, sizeof(*new_qos));
1471 
1472 	/* Zero out the key parts of the QoS structure */
1473 	new_qos->ch = NULL;
1474 	new_qos->thread = NULL;
1475 	new_qos->max_ios_per_timeslice = 0;
1476 	new_qos->max_byte_per_timeslice = 0;
1477 	new_qos->io_submitted_this_timeslice = 0;
1478 	new_qos->byte_submitted_this_timeslice = 0;
1479 	new_qos->poller = NULL;
1480 	TAILQ_INIT(&new_qos->queued);
1481 
1482 	bdev->internal.qos = new_qos;
1483 
1484 	spdk_thread_send_msg(old_qos->thread, spdk_bdev_qos_channel_destroy,
1485 			     old_qos);
1486 
1487 	/* It is safe to continue with destroying the bdev even though the QoS channel hasn't
1488 	 * been destroyed yet. The destruction path will end up waiting for the final
1489 	 * channel to be put before it releases resources. */
1490 
1491 	return 0;
1492 }
1493 
1494 static void
1495 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf)
1496 {
1497 	struct spdk_bdev_channel	*ch = ctx_buf;
1498 	struct spdk_bdev_mgmt_channel	*mgmt_ch;
1499 	struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource;
1500 
1501 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Destroying channel %p for bdev %s on thread %p\n", ch, ch->bdev->name,
1502 		      spdk_get_thread());
1503 
1504 	mgmt_ch = shared_resource->mgmt_ch;
1505 
1506 	_spdk_bdev_abort_queued_io(&ch->queued_resets, ch);
1507 	_spdk_bdev_abort_queued_io(&shared_resource->nomem_io, ch);
1508 	_spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_small, ch);
1509 	_spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_large, ch);
1510 
1511 	_spdk_bdev_channel_destroy_resource(ch);
1512 }
1513 
1514 int
1515 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias)
1516 {
1517 	struct spdk_bdev_alias *tmp;
1518 
1519 	if (alias == NULL) {
1520 		SPDK_ERRLOG("Empty alias passed\n");
1521 		return -EINVAL;
1522 	}
1523 
1524 	if (spdk_bdev_get_by_name(alias)) {
1525 		SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias);
1526 		return -EEXIST;
1527 	}
1528 
1529 	tmp = calloc(1, sizeof(*tmp));
1530 	if (tmp == NULL) {
1531 		SPDK_ERRLOG("Unable to allocate alias\n");
1532 		return -ENOMEM;
1533 	}
1534 
1535 	tmp->alias = strdup(alias);
1536 	if (tmp->alias == NULL) {
1537 		free(tmp);
1538 		SPDK_ERRLOG("Unable to allocate alias\n");
1539 		return -ENOMEM;
1540 	}
1541 
1542 	TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq);
1543 
1544 	return 0;
1545 }
1546 
1547 int
1548 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias)
1549 {
1550 	struct spdk_bdev_alias *tmp;
1551 
1552 	TAILQ_FOREACH(tmp, &bdev->aliases, tailq) {
1553 		if (strcmp(alias, tmp->alias) == 0) {
1554 			TAILQ_REMOVE(&bdev->aliases, tmp, tailq);
1555 			free(tmp->alias);
1556 			free(tmp);
1557 			return 0;
1558 		}
1559 	}
1560 
1561 	SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias);
1562 
1563 	return -ENOENT;
1564 }
1565 
1566 struct spdk_io_channel *
1567 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc)
1568 {
1569 	return spdk_get_io_channel(__bdev_to_io_dev(desc->bdev));
1570 }
1571 
1572 const char *
1573 spdk_bdev_get_name(const struct spdk_bdev *bdev)
1574 {
1575 	return bdev->name;
1576 }
1577 
1578 const char *
1579 spdk_bdev_get_product_name(const struct spdk_bdev *bdev)
1580 {
1581 	return bdev->product_name;
1582 }
1583 
1584 const struct spdk_bdev_aliases_list *
1585 spdk_bdev_get_aliases(const struct spdk_bdev *bdev)
1586 {
1587 	return &bdev->aliases;
1588 }
1589 
1590 uint32_t
1591 spdk_bdev_get_block_size(const struct spdk_bdev *bdev)
1592 {
1593 	return bdev->blocklen;
1594 }
1595 
1596 uint64_t
1597 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev)
1598 {
1599 	return bdev->blockcnt;
1600 }
1601 
1602 uint64_t
1603 spdk_bdev_get_qos_ios_per_sec(struct spdk_bdev *bdev)
1604 {
1605 	uint64_t iops_rate_limit = 0;
1606 
1607 	pthread_mutex_lock(&bdev->internal.mutex);
1608 	if (bdev->internal.qos) {
1609 		iops_rate_limit = bdev->internal.qos->iops_rate_limit;
1610 	}
1611 	pthread_mutex_unlock(&bdev->internal.mutex);
1612 
1613 	return iops_rate_limit;
1614 }
1615 
1616 size_t
1617 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev)
1618 {
1619 	/* TODO: push this logic down to the bdev modules */
1620 	if (bdev->need_aligned_buffer) {
1621 		return bdev->blocklen;
1622 	}
1623 
1624 	return 1;
1625 }
1626 
1627 uint32_t
1628 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev)
1629 {
1630 	return bdev->optimal_io_boundary;
1631 }
1632 
1633 bool
1634 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev)
1635 {
1636 	return bdev->write_cache;
1637 }
1638 
1639 const struct spdk_uuid *
1640 spdk_bdev_get_uuid(const struct spdk_bdev *bdev)
1641 {
1642 	return &bdev->uuid;
1643 }
1644 
1645 int
1646 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size)
1647 {
1648 	int ret;
1649 
1650 	pthread_mutex_lock(&bdev->internal.mutex);
1651 
1652 	/* bdev has open descriptors */
1653 	if (!TAILQ_EMPTY(&bdev->internal.open_descs) &&
1654 	    bdev->blockcnt > size) {
1655 		ret = -EBUSY;
1656 	} else {
1657 		bdev->blockcnt = size;
1658 		ret = 0;
1659 	}
1660 
1661 	pthread_mutex_unlock(&bdev->internal.mutex);
1662 
1663 	return ret;
1664 }
1665 
1666 /*
1667  * Convert I/O offset and length from bytes to blocks.
1668  *
1669  * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size.
1670  */
1671 static uint64_t
1672 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks,
1673 			  uint64_t num_bytes, uint64_t *num_blocks)
1674 {
1675 	uint32_t block_size = bdev->blocklen;
1676 
1677 	*offset_blocks = offset_bytes / block_size;
1678 	*num_blocks = num_bytes / block_size;
1679 
1680 	return (offset_bytes % block_size) | (num_bytes % block_size);
1681 }
1682 
1683 static bool
1684 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks)
1685 {
1686 	/* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there
1687 	 * has been an overflow and hence the offset has been wrapped around */
1688 	if (offset_blocks + num_blocks < offset_blocks) {
1689 		return false;
1690 	}
1691 
1692 	/* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */
1693 	if (offset_blocks + num_blocks > bdev->blockcnt) {
1694 		return false;
1695 	}
1696 
1697 	return true;
1698 }
1699 
1700 int
1701 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1702 	       void *buf, uint64_t offset, uint64_t nbytes,
1703 	       spdk_bdev_io_completion_cb cb, void *cb_arg)
1704 {
1705 	uint64_t offset_blocks, num_blocks;
1706 
1707 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1708 		return -EINVAL;
1709 	}
1710 
1711 	return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
1712 }
1713 
1714 int
1715 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1716 		      void *buf, uint64_t offset_blocks, uint64_t num_blocks,
1717 		      spdk_bdev_io_completion_cb cb, void *cb_arg)
1718 {
1719 	struct spdk_bdev *bdev = desc->bdev;
1720 	struct spdk_bdev_io *bdev_io;
1721 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1722 
1723 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1724 		return -EINVAL;
1725 	}
1726 
1727 	bdev_io = spdk_bdev_get_io(channel);
1728 	if (!bdev_io) {
1729 		return -ENOMEM;
1730 	}
1731 
1732 	bdev_io->internal.ch = channel;
1733 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
1734 	bdev_io->u.bdev.iovs = &bdev_io->iov;
1735 	bdev_io->u.bdev.iovs[0].iov_base = buf;
1736 	bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen;
1737 	bdev_io->u.bdev.iovcnt = 1;
1738 	bdev_io->u.bdev.num_blocks = num_blocks;
1739 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1740 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1741 
1742 	spdk_bdev_io_submit(bdev_io);
1743 	return 0;
1744 }
1745 
1746 int
1747 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1748 		struct iovec *iov, int iovcnt,
1749 		uint64_t offset, uint64_t nbytes,
1750 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1751 {
1752 	uint64_t offset_blocks, num_blocks;
1753 
1754 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1755 		return -EINVAL;
1756 	}
1757 
1758 	return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
1759 }
1760 
1761 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1762 			   struct iovec *iov, int iovcnt,
1763 			   uint64_t offset_blocks, uint64_t num_blocks,
1764 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
1765 {
1766 	struct spdk_bdev *bdev = desc->bdev;
1767 	struct spdk_bdev_io *bdev_io;
1768 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1769 
1770 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1771 		return -EINVAL;
1772 	}
1773 
1774 	bdev_io = spdk_bdev_get_io(channel);
1775 	if (!bdev_io) {
1776 		return -ENOMEM;
1777 	}
1778 
1779 	bdev_io->internal.ch = channel;
1780 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
1781 	bdev_io->u.bdev.iovs = iov;
1782 	bdev_io->u.bdev.iovcnt = iovcnt;
1783 	bdev_io->u.bdev.num_blocks = num_blocks;
1784 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1785 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1786 
1787 	spdk_bdev_io_submit(bdev_io);
1788 	return 0;
1789 }
1790 
1791 int
1792 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1793 		void *buf, uint64_t offset, uint64_t nbytes,
1794 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1795 {
1796 	uint64_t offset_blocks, num_blocks;
1797 
1798 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1799 		return -EINVAL;
1800 	}
1801 
1802 	return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
1803 }
1804 
1805 int
1806 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1807 		       void *buf, uint64_t offset_blocks, uint64_t num_blocks,
1808 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1809 {
1810 	struct spdk_bdev *bdev = desc->bdev;
1811 	struct spdk_bdev_io *bdev_io;
1812 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1813 
1814 	if (!desc->write) {
1815 		return -EBADF;
1816 	}
1817 
1818 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1819 		return -EINVAL;
1820 	}
1821 
1822 	bdev_io = spdk_bdev_get_io(channel);
1823 	if (!bdev_io) {
1824 		return -ENOMEM;
1825 	}
1826 
1827 	bdev_io->internal.ch = channel;
1828 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1829 	bdev_io->u.bdev.iovs = &bdev_io->iov;
1830 	bdev_io->u.bdev.iovs[0].iov_base = buf;
1831 	bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen;
1832 	bdev_io->u.bdev.iovcnt = 1;
1833 	bdev_io->u.bdev.num_blocks = num_blocks;
1834 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1835 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1836 
1837 	spdk_bdev_io_submit(bdev_io);
1838 	return 0;
1839 }
1840 
1841 int
1842 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1843 		 struct iovec *iov, int iovcnt,
1844 		 uint64_t offset, uint64_t len,
1845 		 spdk_bdev_io_completion_cb cb, void *cb_arg)
1846 {
1847 	uint64_t offset_blocks, num_blocks;
1848 
1849 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
1850 		return -EINVAL;
1851 	}
1852 
1853 	return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
1854 }
1855 
1856 int
1857 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1858 			struct iovec *iov, int iovcnt,
1859 			uint64_t offset_blocks, uint64_t num_blocks,
1860 			spdk_bdev_io_completion_cb cb, void *cb_arg)
1861 {
1862 	struct spdk_bdev *bdev = desc->bdev;
1863 	struct spdk_bdev_io *bdev_io;
1864 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1865 
1866 	if (!desc->write) {
1867 		return -EBADF;
1868 	}
1869 
1870 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1871 		return -EINVAL;
1872 	}
1873 
1874 	bdev_io = spdk_bdev_get_io(channel);
1875 	if (!bdev_io) {
1876 		return -ENOMEM;
1877 	}
1878 
1879 	bdev_io->internal.ch = channel;
1880 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1881 	bdev_io->u.bdev.iovs = iov;
1882 	bdev_io->u.bdev.iovcnt = iovcnt;
1883 	bdev_io->u.bdev.num_blocks = num_blocks;
1884 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1885 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1886 
1887 	spdk_bdev_io_submit(bdev_io);
1888 	return 0;
1889 }
1890 
1891 int
1892 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1893 		       uint64_t offset, uint64_t len,
1894 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1895 {
1896 	uint64_t offset_blocks, num_blocks;
1897 
1898 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
1899 		return -EINVAL;
1900 	}
1901 
1902 	return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1903 }
1904 
1905 int
1906 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1907 			      uint64_t offset_blocks, uint64_t num_blocks,
1908 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1909 {
1910 	struct spdk_bdev *bdev = desc->bdev;
1911 	struct spdk_bdev_io *bdev_io;
1912 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1913 	uint64_t len;
1914 	bool split_request = false;
1915 
1916 	if (!desc->write) {
1917 		return -EBADF;
1918 	}
1919 
1920 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1921 		return -EINVAL;
1922 	}
1923 
1924 	bdev_io = spdk_bdev_get_io(channel);
1925 
1926 	if (!bdev_io) {
1927 		return -ENOMEM;
1928 	}
1929 
1930 	bdev_io->internal.ch = channel;
1931 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1932 
1933 	if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
1934 		bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES;
1935 		bdev_io->u.bdev.num_blocks = num_blocks;
1936 		bdev_io->u.bdev.iovs = NULL;
1937 		bdev_io->u.bdev.iovcnt = 0;
1938 
1939 	} else if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)) {
1940 		assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE);
1941 
1942 		len = spdk_bdev_get_block_size(bdev) * num_blocks;
1943 
1944 		if (len > ZERO_BUFFER_SIZE) {
1945 			split_request = true;
1946 			len = ZERO_BUFFER_SIZE;
1947 		}
1948 
1949 		bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1950 		bdev_io->u.bdev.iovs = &bdev_io->iov;
1951 		bdev_io->u.bdev.iovs[0].iov_base = g_bdev_mgr.zero_buffer;
1952 		bdev_io->u.bdev.iovs[0].iov_len = len;
1953 		bdev_io->u.bdev.iovcnt = 1;
1954 		bdev_io->u.bdev.num_blocks = len / spdk_bdev_get_block_size(bdev);
1955 		bdev_io->u.bdev.split_remaining_num_blocks = num_blocks - bdev_io->u.bdev.num_blocks;
1956 		bdev_io->u.bdev.split_current_offset_blocks = offset_blocks + bdev_io->u.bdev.num_blocks;
1957 	} else {
1958 		spdk_bdev_free_io(bdev_io);
1959 		return -ENOTSUP;
1960 	}
1961 
1962 	if (split_request) {
1963 		bdev_io->u.bdev.stored_user_cb = cb;
1964 		spdk_bdev_io_init(bdev_io, bdev, cb_arg, spdk_bdev_write_zeroes_split);
1965 	} else {
1966 		spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1967 	}
1968 	spdk_bdev_io_submit(bdev_io);
1969 	return 0;
1970 }
1971 
1972 int
1973 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1974 		uint64_t offset, uint64_t nbytes,
1975 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1976 {
1977 	uint64_t offset_blocks, num_blocks;
1978 
1979 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1980 		return -EINVAL;
1981 	}
1982 
1983 	return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1984 }
1985 
1986 int
1987 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1988 		       uint64_t offset_blocks, uint64_t num_blocks,
1989 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1990 {
1991 	struct spdk_bdev *bdev = desc->bdev;
1992 	struct spdk_bdev_io *bdev_io;
1993 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1994 
1995 	if (!desc->write) {
1996 		return -EBADF;
1997 	}
1998 
1999 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2000 		return -EINVAL;
2001 	}
2002 
2003 	if (num_blocks == 0) {
2004 		SPDK_ERRLOG("Can't unmap 0 bytes\n");
2005 		return -EINVAL;
2006 	}
2007 
2008 	bdev_io = spdk_bdev_get_io(channel);
2009 	if (!bdev_io) {
2010 		return -ENOMEM;
2011 	}
2012 
2013 	bdev_io->internal.ch = channel;
2014 	bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP;
2015 
2016 	bdev_io->u.bdev.iovs = &bdev_io->iov;
2017 	bdev_io->u.bdev.iovs[0].iov_base = NULL;
2018 	bdev_io->u.bdev.iovs[0].iov_len = 0;
2019 	bdev_io->u.bdev.iovcnt = 1;
2020 
2021 	bdev_io->u.bdev.offset_blocks = offset_blocks;
2022 	bdev_io->u.bdev.num_blocks = num_blocks;
2023 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2024 
2025 	spdk_bdev_io_submit(bdev_io);
2026 	return 0;
2027 }
2028 
2029 int
2030 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2031 		uint64_t offset, uint64_t length,
2032 		spdk_bdev_io_completion_cb cb, void *cb_arg)
2033 {
2034 	uint64_t offset_blocks, num_blocks;
2035 
2036 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) {
2037 		return -EINVAL;
2038 	}
2039 
2040 	return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
2041 }
2042 
2043 int
2044 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2045 		       uint64_t offset_blocks, uint64_t num_blocks,
2046 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
2047 {
2048 	struct spdk_bdev *bdev = desc->bdev;
2049 	struct spdk_bdev_io *bdev_io;
2050 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2051 
2052 	if (!desc->write) {
2053 		return -EBADF;
2054 	}
2055 
2056 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2057 		return -EINVAL;
2058 	}
2059 
2060 	bdev_io = spdk_bdev_get_io(channel);
2061 	if (!bdev_io) {
2062 		return -ENOMEM;
2063 	}
2064 
2065 	bdev_io->internal.ch = channel;
2066 	bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH;
2067 	bdev_io->u.bdev.iovs = NULL;
2068 	bdev_io->u.bdev.iovcnt = 0;
2069 	bdev_io->u.bdev.offset_blocks = offset_blocks;
2070 	bdev_io->u.bdev.num_blocks = num_blocks;
2071 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2072 
2073 	spdk_bdev_io_submit(bdev_io);
2074 	return 0;
2075 }
2076 
2077 static void
2078 _spdk_bdev_reset_dev(struct spdk_io_channel_iter *i, int status)
2079 {
2080 	struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i);
2081 	struct spdk_bdev_io *bdev_io;
2082 
2083 	bdev_io = TAILQ_FIRST(&ch->queued_resets);
2084 	TAILQ_REMOVE(&ch->queued_resets, bdev_io, internal.link);
2085 	spdk_bdev_io_submit_reset(bdev_io);
2086 }
2087 
2088 static void
2089 _spdk_bdev_reset_freeze_channel(struct spdk_io_channel_iter *i)
2090 {
2091 	struct spdk_io_channel		*ch;
2092 	struct spdk_bdev_channel	*channel;
2093 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
2094 	struct spdk_bdev_shared_resource *shared_resource;
2095 	bdev_io_tailq_t			tmp_queued;
2096 
2097 	TAILQ_INIT(&tmp_queued);
2098 
2099 	ch = spdk_io_channel_iter_get_channel(i);
2100 	channel = spdk_io_channel_get_ctx(ch);
2101 	shared_resource = channel->shared_resource;
2102 	mgmt_channel = shared_resource->mgmt_ch;
2103 
2104 	channel->flags |= BDEV_CH_RESET_IN_PROGRESS;
2105 
2106 	if ((channel->flags & BDEV_CH_QOS_ENABLED) != 0) {
2107 		/* The QoS object is always valid and readable while
2108 		 * the channel flag is set, so the lock here should not
2109 		 * be necessary. We're not in the fast path though, so
2110 		 * just take it anyway. */
2111 		pthread_mutex_lock(&channel->bdev->internal.mutex);
2112 		if (channel->bdev->internal.qos->ch == channel) {
2113 			TAILQ_SWAP(&channel->bdev->internal.qos->queued, &tmp_queued, spdk_bdev_io, internal.link);
2114 		}
2115 		pthread_mutex_unlock(&channel->bdev->internal.mutex);
2116 	}
2117 
2118 	_spdk_bdev_abort_queued_io(&shared_resource->nomem_io, channel);
2119 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel);
2120 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel);
2121 	_spdk_bdev_abort_queued_io(&tmp_queued, channel);
2122 
2123 	spdk_for_each_channel_continue(i, 0);
2124 }
2125 
2126 static void
2127 _spdk_bdev_start_reset(void *ctx)
2128 {
2129 	struct spdk_bdev_channel *ch = ctx;
2130 
2131 	spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), _spdk_bdev_reset_freeze_channel,
2132 			      ch, _spdk_bdev_reset_dev);
2133 }
2134 
2135 static void
2136 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch)
2137 {
2138 	struct spdk_bdev *bdev = ch->bdev;
2139 
2140 	assert(!TAILQ_EMPTY(&ch->queued_resets));
2141 
2142 	pthread_mutex_lock(&bdev->internal.mutex);
2143 	if (bdev->internal.reset_in_progress == NULL) {
2144 		bdev->internal.reset_in_progress = TAILQ_FIRST(&ch->queued_resets);
2145 		/*
2146 		 * Take a channel reference for the target bdev for the life of this
2147 		 *  reset.  This guards against the channel getting destroyed while
2148 		 *  spdk_for_each_channel() calls related to this reset IO are in
2149 		 *  progress.  We will release the reference when this reset is
2150 		 *  completed.
2151 		 */
2152 		bdev->internal.reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev));
2153 		_spdk_bdev_start_reset(ch);
2154 	}
2155 	pthread_mutex_unlock(&bdev->internal.mutex);
2156 }
2157 
2158 int
2159 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2160 		spdk_bdev_io_completion_cb cb, void *cb_arg)
2161 {
2162 	struct spdk_bdev *bdev = desc->bdev;
2163 	struct spdk_bdev_io *bdev_io;
2164 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2165 
2166 	bdev_io = spdk_bdev_get_io(channel);
2167 	if (!bdev_io) {
2168 		return -ENOMEM;
2169 	}
2170 
2171 	bdev_io->internal.ch = channel;
2172 	bdev_io->type = SPDK_BDEV_IO_TYPE_RESET;
2173 	bdev_io->u.reset.ch_ref = NULL;
2174 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2175 
2176 	pthread_mutex_lock(&bdev->internal.mutex);
2177 	TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, internal.link);
2178 	pthread_mutex_unlock(&bdev->internal.mutex);
2179 
2180 	_spdk_bdev_channel_start_reset(channel);
2181 
2182 	return 0;
2183 }
2184 
2185 void
2186 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
2187 		      struct spdk_bdev_io_stat *stat)
2188 {
2189 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2190 
2191 	*stat = channel->stat;
2192 }
2193 
2194 static void
2195 _spdk_bdev_get_device_stat_done(struct spdk_io_channel_iter *i, int status)
2196 {
2197 	void *io_device = spdk_io_channel_iter_get_io_device(i);
2198 	struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i);
2199 
2200 	bdev_iostat_ctx->cb(__bdev_from_io_dev(io_device), bdev_iostat_ctx->stat,
2201 			    bdev_iostat_ctx->cb_arg, 0);
2202 	free(bdev_iostat_ctx);
2203 }
2204 
2205 static void
2206 _spdk_bdev_get_each_channel_stat(struct spdk_io_channel_iter *i)
2207 {
2208 	struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i);
2209 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2210 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2211 
2212 	bdev_iostat_ctx->stat->bytes_read += channel->stat.bytes_read;
2213 	bdev_iostat_ctx->stat->num_read_ops += channel->stat.num_read_ops;
2214 	bdev_iostat_ctx->stat->bytes_written += channel->stat.bytes_written;
2215 	bdev_iostat_ctx->stat->num_write_ops += channel->stat.num_write_ops;
2216 
2217 	spdk_for_each_channel_continue(i, 0);
2218 }
2219 
2220 void
2221 spdk_bdev_get_device_stat(struct spdk_bdev *bdev, struct spdk_bdev_io_stat *stat,
2222 			  spdk_bdev_get_device_stat_cb cb, void *cb_arg)
2223 {
2224 	struct spdk_bdev_iostat_ctx *bdev_iostat_ctx;
2225 
2226 	assert(bdev != NULL);
2227 	assert(stat != NULL);
2228 	assert(cb != NULL);
2229 
2230 	bdev_iostat_ctx = calloc(1, sizeof(struct spdk_bdev_iostat_ctx));
2231 	if (bdev_iostat_ctx == NULL) {
2232 		SPDK_ERRLOG("Unable to allocate memory for spdk_bdev_iostat_ctx\n");
2233 		cb(bdev, stat, cb_arg, -ENOMEM);
2234 		return;
2235 	}
2236 
2237 	bdev_iostat_ctx->stat = stat;
2238 	bdev_iostat_ctx->cb = cb;
2239 	bdev_iostat_ctx->cb_arg = cb_arg;
2240 
2241 	spdk_for_each_channel(__bdev_to_io_dev(bdev),
2242 			      _spdk_bdev_get_each_channel_stat,
2243 			      bdev_iostat_ctx,
2244 			      _spdk_bdev_get_device_stat_done);
2245 }
2246 
2247 int
2248 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2249 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
2250 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
2251 {
2252 	struct spdk_bdev *bdev = desc->bdev;
2253 	struct spdk_bdev_io *bdev_io;
2254 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2255 
2256 	if (!desc->write) {
2257 		return -EBADF;
2258 	}
2259 
2260 	bdev_io = spdk_bdev_get_io(channel);
2261 	if (!bdev_io) {
2262 		return -ENOMEM;
2263 	}
2264 
2265 	bdev_io->internal.ch = channel;
2266 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN;
2267 	bdev_io->u.nvme_passthru.cmd = *cmd;
2268 	bdev_io->u.nvme_passthru.buf = buf;
2269 	bdev_io->u.nvme_passthru.nbytes = nbytes;
2270 	bdev_io->u.nvme_passthru.md_buf = NULL;
2271 	bdev_io->u.nvme_passthru.md_len = 0;
2272 
2273 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2274 
2275 	spdk_bdev_io_submit(bdev_io);
2276 	return 0;
2277 }
2278 
2279 int
2280 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2281 			   const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
2282 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
2283 {
2284 	struct spdk_bdev *bdev = desc->bdev;
2285 	struct spdk_bdev_io *bdev_io;
2286 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2287 
2288 	if (!desc->write) {
2289 		/*
2290 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
2291 		 *  to easily determine if the command is a read or write, but for now just
2292 		 *  do not allow io_passthru with a read-only descriptor.
2293 		 */
2294 		return -EBADF;
2295 	}
2296 
2297 	bdev_io = spdk_bdev_get_io(channel);
2298 	if (!bdev_io) {
2299 		return -ENOMEM;
2300 	}
2301 
2302 	bdev_io->internal.ch = channel;
2303 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO;
2304 	bdev_io->u.nvme_passthru.cmd = *cmd;
2305 	bdev_io->u.nvme_passthru.buf = buf;
2306 	bdev_io->u.nvme_passthru.nbytes = nbytes;
2307 	bdev_io->u.nvme_passthru.md_buf = NULL;
2308 	bdev_io->u.nvme_passthru.md_len = 0;
2309 
2310 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2311 
2312 	spdk_bdev_io_submit(bdev_io);
2313 	return 0;
2314 }
2315 
2316 int
2317 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2318 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len,
2319 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
2320 {
2321 	struct spdk_bdev *bdev = desc->bdev;
2322 	struct spdk_bdev_io *bdev_io;
2323 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2324 
2325 	if (!desc->write) {
2326 		/*
2327 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
2328 		 *  to easily determine if the command is a read or write, but for now just
2329 		 *  do not allow io_passthru with a read-only descriptor.
2330 		 */
2331 		return -EBADF;
2332 	}
2333 
2334 	bdev_io = spdk_bdev_get_io(channel);
2335 	if (!bdev_io) {
2336 		return -ENOMEM;
2337 	}
2338 
2339 	bdev_io->internal.ch = channel;
2340 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD;
2341 	bdev_io->u.nvme_passthru.cmd = *cmd;
2342 	bdev_io->u.nvme_passthru.buf = buf;
2343 	bdev_io->u.nvme_passthru.nbytes = nbytes;
2344 	bdev_io->u.nvme_passthru.md_buf = md_buf;
2345 	bdev_io->u.nvme_passthru.md_len = md_len;
2346 
2347 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2348 
2349 	spdk_bdev_io_submit(bdev_io);
2350 	return 0;
2351 }
2352 
2353 int
2354 spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
2355 			struct spdk_bdev_io_wait_entry *entry)
2356 {
2357 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2358 	struct spdk_bdev_mgmt_channel *mgmt_ch = channel->shared_resource->mgmt_ch;
2359 
2360 	if (bdev != entry->bdev) {
2361 		SPDK_ERRLOG("bdevs do not match\n");
2362 		return -EINVAL;
2363 	}
2364 
2365 	if (mgmt_ch->per_thread_cache_count > 0) {
2366 		SPDK_ERRLOG("Cannot queue io_wait if spdk_bdev_io available in per-thread cache\n");
2367 		return -EINVAL;
2368 	}
2369 
2370 	TAILQ_INSERT_TAIL(&mgmt_ch->io_wait_queue, entry, link);
2371 	return 0;
2372 }
2373 
2374 static void
2375 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch)
2376 {
2377 	struct spdk_bdev *bdev = bdev_ch->bdev;
2378 	struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource;
2379 	struct spdk_bdev_io *bdev_io;
2380 
2381 	if (shared_resource->io_outstanding > shared_resource->nomem_threshold) {
2382 		/*
2383 		 * Allow some more I/O to complete before retrying the nomem_io queue.
2384 		 *  Some drivers (such as nvme) cannot immediately take a new I/O in
2385 		 *  the context of a completion, because the resources for the I/O are
2386 		 *  not released until control returns to the bdev poller.  Also, we
2387 		 *  may require several small I/O to complete before a larger I/O
2388 		 *  (that requires splitting) can be submitted.
2389 		 */
2390 		return;
2391 	}
2392 
2393 	while (!TAILQ_EMPTY(&shared_resource->nomem_io)) {
2394 		bdev_io = TAILQ_FIRST(&shared_resource->nomem_io);
2395 		TAILQ_REMOVE(&shared_resource->nomem_io, bdev_io, internal.link);
2396 		bdev_io->internal.ch->io_outstanding++;
2397 		shared_resource->io_outstanding++;
2398 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING;
2399 		bdev->fn_table->submit_request(bdev_io->internal.ch->channel, bdev_io);
2400 		if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) {
2401 			break;
2402 		}
2403 	}
2404 }
2405 
2406 static inline void
2407 _spdk_bdev_io_complete(void *ctx)
2408 {
2409 	struct spdk_bdev_io *bdev_io = ctx;
2410 
2411 	if (spdk_unlikely(bdev_io->internal.in_submit_request || bdev_io->internal.io_submit_ch)) {
2412 		/*
2413 		 * Send the completion to the thread that originally submitted the I/O,
2414 		 * which may not be the current thread in the case of QoS.
2415 		 */
2416 		if (bdev_io->internal.io_submit_ch) {
2417 			bdev_io->internal.ch = bdev_io->internal.io_submit_ch;
2418 			bdev_io->internal.io_submit_ch = NULL;
2419 		}
2420 
2421 		/*
2422 		 * Defer completion to avoid potential infinite recursion if the
2423 		 * user's completion callback issues a new I/O.
2424 		 */
2425 		spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel),
2426 				     _spdk_bdev_io_complete, bdev_io);
2427 		return;
2428 	}
2429 
2430 	if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) {
2431 		switch (bdev_io->type) {
2432 		case SPDK_BDEV_IO_TYPE_READ:
2433 			bdev_io->internal.ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
2434 			bdev_io->internal.ch->stat.num_read_ops++;
2435 			bdev_io->internal.ch->stat.read_latency_ticks += (spdk_get_ticks() - bdev_io->internal.submit_tsc);
2436 			break;
2437 		case SPDK_BDEV_IO_TYPE_WRITE:
2438 			bdev_io->internal.ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
2439 			bdev_io->internal.ch->stat.num_write_ops++;
2440 			bdev_io->internal.ch->stat.write_latency_ticks += (spdk_get_ticks() - bdev_io->internal.submit_tsc);
2441 			break;
2442 		default:
2443 			break;
2444 		}
2445 	}
2446 
2447 #ifdef SPDK_CONFIG_VTUNE
2448 	uint64_t now_tsc = spdk_get_ticks();
2449 	if (now_tsc > (bdev_io->internal.ch->start_tsc + bdev_io->internal.ch->interval_tsc)) {
2450 		uint64_t data[5];
2451 
2452 		data[0] = bdev_io->internal.ch->stat.num_read_ops - bdev_io->internal.ch->prev_stat.num_read_ops;
2453 		data[1] = bdev_io->internal.ch->stat.bytes_read - bdev_io->internal.ch->prev_stat.bytes_read;
2454 		data[2] = bdev_io->internal.ch->stat.num_write_ops - bdev_io->internal.ch->prev_stat.num_write_ops;
2455 		data[3] = bdev_io->internal.ch->stat.bytes_written - bdev_io->internal.ch->prev_stat.bytes_written;
2456 		data[4] = bdev_io->bdev->fn_table->get_spin_time ?
2457 			  bdev_io->bdev->fn_table->get_spin_time(bdev_io->internal.ch->channel) : 0;
2458 
2459 		__itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->internal.ch->handle,
2460 				   __itt_metadata_u64, 5, data);
2461 
2462 		bdev_io->internal.ch->prev_stat = bdev_io->internal.ch->stat;
2463 		bdev_io->internal.ch->start_tsc = now_tsc;
2464 	}
2465 #endif
2466 
2467 	assert(bdev_io->internal.cb != NULL);
2468 	assert(spdk_get_thread() == spdk_io_channel_get_thread(bdev_io->internal.ch->channel));
2469 
2470 	bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS,
2471 			     bdev_io->internal.caller_ctx);
2472 }
2473 
2474 static void
2475 _spdk_bdev_reset_complete(struct spdk_io_channel_iter *i, int status)
2476 {
2477 	struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i);
2478 
2479 	if (bdev_io->u.reset.ch_ref != NULL) {
2480 		spdk_put_io_channel(bdev_io->u.reset.ch_ref);
2481 		bdev_io->u.reset.ch_ref = NULL;
2482 	}
2483 
2484 	_spdk_bdev_io_complete(bdev_io);
2485 }
2486 
2487 static void
2488 _spdk_bdev_unfreeze_channel(struct spdk_io_channel_iter *i)
2489 {
2490 	struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i);
2491 	struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
2492 
2493 	ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS;
2494 	if (!TAILQ_EMPTY(&ch->queued_resets)) {
2495 		_spdk_bdev_channel_start_reset(ch);
2496 	}
2497 
2498 	spdk_for_each_channel_continue(i, 0);
2499 }
2500 
2501 void
2502 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
2503 {
2504 	struct spdk_bdev *bdev = bdev_io->bdev;
2505 	struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch;
2506 	struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource;
2507 
2508 	bdev_io->internal.status = status;
2509 
2510 	if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) {
2511 		bool unlock_channels = false;
2512 
2513 		if (status == SPDK_BDEV_IO_STATUS_NOMEM) {
2514 			SPDK_ERRLOG("NOMEM returned for reset\n");
2515 		}
2516 		pthread_mutex_lock(&bdev->internal.mutex);
2517 		if (bdev_io == bdev->internal.reset_in_progress) {
2518 			bdev->internal.reset_in_progress = NULL;
2519 			unlock_channels = true;
2520 		}
2521 		pthread_mutex_unlock(&bdev->internal.mutex);
2522 
2523 		if (unlock_channels) {
2524 			spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_unfreeze_channel,
2525 					      bdev_io, _spdk_bdev_reset_complete);
2526 			return;
2527 		}
2528 	} else {
2529 		assert(bdev_ch->io_outstanding > 0);
2530 		assert(shared_resource->io_outstanding > 0);
2531 		bdev_ch->io_outstanding--;
2532 		shared_resource->io_outstanding--;
2533 
2534 		if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) {
2535 			TAILQ_INSERT_HEAD(&shared_resource->nomem_io, bdev_io, internal.link);
2536 			/*
2537 			 * Wait for some of the outstanding I/O to complete before we
2538 			 *  retry any of the nomem_io.  Normally we will wait for
2539 			 *  NOMEM_THRESHOLD_COUNT I/O to complete but for low queue
2540 			 *  depth channels we will instead wait for half to complete.
2541 			 */
2542 			shared_resource->nomem_threshold = spdk_max((int64_t)shared_resource->io_outstanding / 2,
2543 							   (int64_t)shared_resource->io_outstanding - NOMEM_THRESHOLD_COUNT);
2544 			return;
2545 		}
2546 
2547 		if (spdk_unlikely(!TAILQ_EMPTY(&shared_resource->nomem_io))) {
2548 			_spdk_bdev_ch_retry_io(bdev_ch);
2549 		}
2550 	}
2551 
2552 	_spdk_bdev_io_complete(bdev_io);
2553 }
2554 
2555 void
2556 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc,
2557 				  enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq)
2558 {
2559 	if (sc == SPDK_SCSI_STATUS_GOOD) {
2560 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS;
2561 	} else {
2562 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SCSI_ERROR;
2563 		bdev_io->internal.error.scsi.sc = sc;
2564 		bdev_io->internal.error.scsi.sk = sk;
2565 		bdev_io->internal.error.scsi.asc = asc;
2566 		bdev_io->internal.error.scsi.ascq = ascq;
2567 	}
2568 
2569 	spdk_bdev_io_complete(bdev_io, bdev_io->internal.status);
2570 }
2571 
2572 void
2573 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io,
2574 			     int *sc, int *sk, int *asc, int *ascq)
2575 {
2576 	assert(sc != NULL);
2577 	assert(sk != NULL);
2578 	assert(asc != NULL);
2579 	assert(ascq != NULL);
2580 
2581 	switch (bdev_io->internal.status) {
2582 	case SPDK_BDEV_IO_STATUS_SUCCESS:
2583 		*sc = SPDK_SCSI_STATUS_GOOD;
2584 		*sk = SPDK_SCSI_SENSE_NO_SENSE;
2585 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
2586 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
2587 		break;
2588 	case SPDK_BDEV_IO_STATUS_NVME_ERROR:
2589 		spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq);
2590 		break;
2591 	case SPDK_BDEV_IO_STATUS_SCSI_ERROR:
2592 		*sc = bdev_io->internal.error.scsi.sc;
2593 		*sk = bdev_io->internal.error.scsi.sk;
2594 		*asc = bdev_io->internal.error.scsi.asc;
2595 		*ascq = bdev_io->internal.error.scsi.ascq;
2596 		break;
2597 	default:
2598 		*sc = SPDK_SCSI_STATUS_CHECK_CONDITION;
2599 		*sk = SPDK_SCSI_SENSE_ABORTED_COMMAND;
2600 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
2601 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
2602 		break;
2603 	}
2604 }
2605 
2606 void
2607 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc)
2608 {
2609 	if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) {
2610 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS;
2611 	} else {
2612 		bdev_io->internal.error.nvme.sct = sct;
2613 		bdev_io->internal.error.nvme.sc = sc;
2614 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NVME_ERROR;
2615 	}
2616 
2617 	spdk_bdev_io_complete(bdev_io, bdev_io->internal.status);
2618 }
2619 
2620 void
2621 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc)
2622 {
2623 	assert(sct != NULL);
2624 	assert(sc != NULL);
2625 
2626 	if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) {
2627 		*sct = bdev_io->internal.error.nvme.sct;
2628 		*sc = bdev_io->internal.error.nvme.sc;
2629 	} else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) {
2630 		*sct = SPDK_NVME_SCT_GENERIC;
2631 		*sc = SPDK_NVME_SC_SUCCESS;
2632 	} else {
2633 		*sct = SPDK_NVME_SCT_GENERIC;
2634 		*sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
2635 	}
2636 }
2637 
2638 struct spdk_thread *
2639 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io)
2640 {
2641 	return spdk_io_channel_get_thread(bdev_io->internal.ch->channel);
2642 }
2643 
2644 static void
2645 _spdk_bdev_qos_config_type(struct spdk_bdev *bdev, uint64_t qos_set,
2646 			   enum spdk_bdev_qos_type qos_type)
2647 {
2648 	uint64_t	min_qos_set = 0;
2649 
2650 	switch (qos_type) {
2651 	case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT:
2652 		min_qos_set = SPDK_BDEV_QOS_MIN_IOS_PER_SEC;
2653 		break;
2654 	case SPDK_BDEV_QOS_RW_BYTEPS_RATE_LIMIT:
2655 		min_qos_set = SPDK_BDEV_QOS_MIN_BW_IN_MB_PER_SEC;
2656 		break;
2657 	default:
2658 		SPDK_ERRLOG("Unsupported QoS type.\n");
2659 		return;
2660 	}
2661 
2662 	if (qos_set % min_qos_set) {
2663 		SPDK_ERRLOG("Assigned QoS %" PRIu64 " on bdev %s is not multiple of %lu\n",
2664 			    qos_set, bdev->name, min_qos_set);
2665 		SPDK_ERRLOG("Failed to enable QoS on this bdev %s\n", bdev->name);
2666 		return;
2667 	}
2668 
2669 	if (!bdev->internal.qos) {
2670 		bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos));
2671 		if (!bdev->internal.qos) {
2672 			SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n");
2673 			return;
2674 		}
2675 	}
2676 
2677 	switch (qos_type) {
2678 	case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT:
2679 		bdev->internal.qos->iops_rate_limit = qos_set;
2680 		break;
2681 	case SPDK_BDEV_QOS_RW_BYTEPS_RATE_LIMIT:
2682 		bdev->internal.qos->byte_rate_limit = qos_set * 1024 * 1024;
2683 		break;
2684 	default:
2685 		break;
2686 	}
2687 
2688 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev:%s QoS type:%d set:%lu\n",
2689 		      bdev->name, qos_type, qos_set);
2690 
2691 	return;
2692 }
2693 
2694 static void
2695 _spdk_bdev_qos_config(struct spdk_bdev *bdev)
2696 {
2697 	struct spdk_conf_section	*sp = NULL;
2698 	const char			*val = NULL;
2699 	uint64_t			qos_set = 0;
2700 	int				i = 0, j = 0;
2701 
2702 	sp = spdk_conf_find_section(NULL, "QoS");
2703 	if (!sp) {
2704 		return;
2705 	}
2706 
2707 	while (j < SPDK_BDEV_QOS_NUM_TYPES) {
2708 		i = 0;
2709 		while (true) {
2710 			val = spdk_conf_section_get_nmval(sp, qos_type_str[j], i, 0);
2711 			if (!val) {
2712 				break;
2713 			}
2714 
2715 			if (strcmp(bdev->name, val) != 0) {
2716 				i++;
2717 				continue;
2718 			}
2719 
2720 			val = spdk_conf_section_get_nmval(sp, qos_type_str[j], i, 1);
2721 			if (val) {
2722 				qos_set = strtoull(val, NULL, 10);
2723 				_spdk_bdev_qos_config_type(bdev, qos_set, j);
2724 			}
2725 
2726 			break;
2727 		}
2728 
2729 		j++;
2730 	}
2731 
2732 	return;
2733 }
2734 
2735 static int
2736 spdk_bdev_init(struct spdk_bdev *bdev)
2737 {
2738 	assert(bdev->module != NULL);
2739 
2740 	if (!bdev->name) {
2741 		SPDK_ERRLOG("Bdev name is NULL\n");
2742 		return -EINVAL;
2743 	}
2744 
2745 	if (spdk_bdev_get_by_name(bdev->name)) {
2746 		SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name);
2747 		return -EEXIST;
2748 	}
2749 
2750 	bdev->internal.status = SPDK_BDEV_STATUS_READY;
2751 
2752 	TAILQ_INIT(&bdev->internal.open_descs);
2753 
2754 	TAILQ_INIT(&bdev->aliases);
2755 
2756 	bdev->internal.reset_in_progress = NULL;
2757 
2758 	_spdk_bdev_qos_config(bdev);
2759 
2760 	spdk_io_device_register(__bdev_to_io_dev(bdev),
2761 				spdk_bdev_channel_create, spdk_bdev_channel_destroy,
2762 				sizeof(struct spdk_bdev_channel));
2763 
2764 	pthread_mutex_init(&bdev->internal.mutex, NULL);
2765 	return 0;
2766 }
2767 
2768 static void
2769 spdk_bdev_destroy_cb(void *io_device)
2770 {
2771 	int			rc;
2772 	struct spdk_bdev	*bdev;
2773 	spdk_bdev_unregister_cb	cb_fn;
2774 	void			*cb_arg;
2775 
2776 	bdev = __bdev_from_io_dev(io_device);
2777 	cb_fn = bdev->internal.unregister_cb;
2778 	cb_arg = bdev->internal.unregister_ctx;
2779 
2780 	rc = bdev->fn_table->destruct(bdev->ctxt);
2781 	if (rc < 0) {
2782 		SPDK_ERRLOG("destruct failed\n");
2783 	}
2784 	if (rc <= 0 && cb_fn != NULL) {
2785 		cb_fn(cb_arg, rc);
2786 	}
2787 }
2788 
2789 
2790 static void
2791 spdk_bdev_fini(struct spdk_bdev *bdev)
2792 {
2793 	pthread_mutex_destroy(&bdev->internal.mutex);
2794 
2795 	free(bdev->internal.qos);
2796 
2797 	spdk_io_device_unregister(__bdev_to_io_dev(bdev), spdk_bdev_destroy_cb);
2798 }
2799 
2800 static void
2801 spdk_bdev_start(struct spdk_bdev *bdev)
2802 {
2803 	struct spdk_bdev_module *module;
2804 	uint32_t action;
2805 
2806 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name);
2807 	TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, internal.link);
2808 
2809 	/* Examine configuration before initializing I/O */
2810 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) {
2811 		if (module->examine_config) {
2812 			action = module->internal.action_in_progress;
2813 			module->internal.action_in_progress++;
2814 			module->examine_config(bdev);
2815 			if (action != module->internal.action_in_progress) {
2816 				SPDK_ERRLOG("examine_config for module %s did not call spdk_bdev_module_examine_done()\n",
2817 					    module->name);
2818 			}
2819 		}
2820 	}
2821 
2822 	if (bdev->internal.claim_module) {
2823 		return;
2824 	}
2825 
2826 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) {
2827 		if (module->examine_disk) {
2828 			module->internal.action_in_progress++;
2829 			module->examine_disk(bdev);
2830 		}
2831 	}
2832 }
2833 
2834 int
2835 spdk_bdev_register(struct spdk_bdev *bdev)
2836 {
2837 	int rc = spdk_bdev_init(bdev);
2838 
2839 	if (rc == 0) {
2840 		spdk_bdev_start(bdev);
2841 	}
2842 
2843 	return rc;
2844 }
2845 
2846 static void
2847 spdk_vbdev_remove_base_bdevs(struct spdk_bdev *vbdev)
2848 {
2849 	struct spdk_bdev **bdevs;
2850 	struct spdk_bdev *base;
2851 	size_t i, j, k;
2852 	bool found;
2853 
2854 	/* Iterate over base bdevs to remove vbdev from them. */
2855 	for (i = 0; i < vbdev->internal.base_bdevs_cnt; i++) {
2856 		found = false;
2857 		base = vbdev->internal.base_bdevs[i];
2858 
2859 		for (j = 0; j < base->vbdevs_cnt; j++) {
2860 			if (base->vbdevs[j] != vbdev) {
2861 				continue;
2862 			}
2863 
2864 			for (k = j; k + 1 < base->vbdevs_cnt; k++) {
2865 				base->vbdevs[k] = base->vbdevs[k + 1];
2866 			}
2867 
2868 			base->vbdevs_cnt--;
2869 			if (base->vbdevs_cnt > 0) {
2870 				bdevs = realloc(base->vbdevs, base->vbdevs_cnt * sizeof(bdevs[0]));
2871 				/* It would be odd if shrinking memory block fail. */
2872 				assert(bdevs);
2873 				base->vbdevs = bdevs;
2874 			} else {
2875 				free(base->vbdevs);
2876 				base->vbdevs = NULL;
2877 			}
2878 
2879 			found = true;
2880 			break;
2881 		}
2882 
2883 		if (!found) {
2884 			SPDK_WARNLOG("Bdev '%s' is not base bdev of '%s'.\n", base->name, vbdev->name);
2885 		}
2886 	}
2887 
2888 	free(vbdev->internal.base_bdevs);
2889 	vbdev->internal.base_bdevs = NULL;
2890 	vbdev->internal.base_bdevs_cnt = 0;
2891 }
2892 
2893 static int
2894 spdk_vbdev_set_base_bdevs(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, size_t cnt)
2895 {
2896 	struct spdk_bdev **vbdevs;
2897 	struct spdk_bdev *base;
2898 	size_t i;
2899 
2900 	/* Adding base bdevs isn't supported (yet?). */
2901 	assert(vbdev->internal.base_bdevs_cnt == 0);
2902 
2903 	vbdev->internal.base_bdevs = malloc(cnt * sizeof(vbdev->internal.base_bdevs[0]));
2904 	if (!vbdev->internal.base_bdevs) {
2905 		SPDK_ERRLOG("%s - realloc() failed\n", vbdev->name);
2906 		return -ENOMEM;
2907 	}
2908 
2909 	memcpy(vbdev->internal.base_bdevs, base_bdevs, cnt * sizeof(vbdev->internal.base_bdevs[0]));
2910 	vbdev->internal.base_bdevs_cnt = cnt;
2911 
2912 	/* Iterate over base bdevs to add this vbdev to them. */
2913 	for (i = 0; i < cnt; i++) {
2914 		base = vbdev->internal.base_bdevs[i];
2915 
2916 		assert(base != NULL);
2917 		assert(base->internal.claim_module != NULL);
2918 
2919 		vbdevs = realloc(base->vbdevs, (base->vbdevs_cnt + 1) * sizeof(vbdevs[0]));
2920 		if (!vbdevs) {
2921 			SPDK_ERRLOG("%s - realloc() failed\n", base->name);
2922 			spdk_vbdev_remove_base_bdevs(vbdev);
2923 			return -ENOMEM;
2924 		}
2925 
2926 		vbdevs[base->vbdevs_cnt] = vbdev;
2927 		base->vbdevs = vbdevs;
2928 		base->vbdevs_cnt++;
2929 	}
2930 
2931 	return 0;
2932 }
2933 
2934 int
2935 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count)
2936 {
2937 	int rc;
2938 
2939 	rc = spdk_bdev_init(vbdev);
2940 	if (rc) {
2941 		return rc;
2942 	}
2943 
2944 	if (base_bdev_count == 0) {
2945 		spdk_bdev_start(vbdev);
2946 		return 0;
2947 	}
2948 
2949 	rc = spdk_vbdev_set_base_bdevs(vbdev, base_bdevs, base_bdev_count);
2950 	if (rc) {
2951 		spdk_bdev_fini(vbdev);
2952 		return rc;
2953 	}
2954 
2955 	spdk_bdev_start(vbdev);
2956 	return 0;
2957 
2958 }
2959 
2960 void
2961 spdk_bdev_destruct_done(struct spdk_bdev *bdev, int bdeverrno)
2962 {
2963 	if (bdev->internal.unregister_cb != NULL) {
2964 		bdev->internal.unregister_cb(bdev->internal.unregister_ctx, bdeverrno);
2965 	}
2966 }
2967 
2968 static void
2969 _remove_notify(void *arg)
2970 {
2971 	struct spdk_bdev_desc *desc = arg;
2972 
2973 	desc->remove_cb(desc->remove_ctx);
2974 }
2975 
2976 void
2977 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg)
2978 {
2979 	struct spdk_bdev_desc	*desc, *tmp;
2980 	bool			do_destruct = true;
2981 	struct spdk_thread	*thread;
2982 
2983 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name);
2984 
2985 	thread = spdk_get_thread();
2986 	if (!thread) {
2987 		/* The user called this from a non-SPDK thread. */
2988 		if (cb_fn != NULL) {
2989 			cb_fn(cb_arg, -ENOTSUP);
2990 		}
2991 		return;
2992 	}
2993 
2994 	pthread_mutex_lock(&bdev->internal.mutex);
2995 
2996 	spdk_vbdev_remove_base_bdevs(bdev);
2997 
2998 	bdev->internal.status = SPDK_BDEV_STATUS_REMOVING;
2999 	bdev->internal.unregister_cb = cb_fn;
3000 	bdev->internal.unregister_ctx = cb_arg;
3001 
3002 	TAILQ_FOREACH_SAFE(desc, &bdev->internal.open_descs, link, tmp) {
3003 		if (desc->remove_cb) {
3004 			do_destruct = false;
3005 			/*
3006 			 * Defer invocation of the remove_cb to a separate message that will
3007 			 *  run later on this thread.  This ensures this context unwinds and
3008 			 *  we don't recursively unregister this bdev again if the remove_cb
3009 			 *  immediately closes its descriptor.
3010 			 */
3011 			if (!desc->remove_scheduled) {
3012 				/* Avoid scheduling removal of the same descriptor multiple times. */
3013 				desc->remove_scheduled = true;
3014 				spdk_thread_send_msg(thread, _remove_notify, desc);
3015 			}
3016 		}
3017 	}
3018 
3019 	if (!do_destruct) {
3020 		pthread_mutex_unlock(&bdev->internal.mutex);
3021 		return;
3022 	}
3023 
3024 	TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link);
3025 	pthread_mutex_unlock(&bdev->internal.mutex);
3026 
3027 	spdk_bdev_fini(bdev);
3028 }
3029 
3030 int
3031 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb,
3032 	       void *remove_ctx, struct spdk_bdev_desc **_desc)
3033 {
3034 	struct spdk_bdev_desc *desc;
3035 
3036 	desc = calloc(1, sizeof(*desc));
3037 	if (desc == NULL) {
3038 		SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n");
3039 		return -ENOMEM;
3040 	}
3041 
3042 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Opening descriptor %p for bdev %s on thread %p\n", desc, bdev->name,
3043 		      spdk_get_thread());
3044 
3045 	pthread_mutex_lock(&bdev->internal.mutex);
3046 
3047 	if (write && bdev->internal.claim_module) {
3048 		SPDK_ERRLOG("Could not open %s - already claimed\n", bdev->name);
3049 		free(desc);
3050 		pthread_mutex_unlock(&bdev->internal.mutex);
3051 		return -EPERM;
3052 	}
3053 
3054 	TAILQ_INSERT_TAIL(&bdev->internal.open_descs, desc, link);
3055 
3056 	desc->bdev = bdev;
3057 	desc->remove_cb = remove_cb;
3058 	desc->remove_ctx = remove_ctx;
3059 	desc->write = write;
3060 	*_desc = desc;
3061 
3062 	pthread_mutex_unlock(&bdev->internal.mutex);
3063 
3064 	return 0;
3065 }
3066 
3067 void
3068 spdk_bdev_close(struct spdk_bdev_desc *desc)
3069 {
3070 	struct spdk_bdev *bdev = desc->bdev;
3071 	bool do_unregister = false;
3072 
3073 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closing descriptor %p for bdev %s on thread %p\n", desc, bdev->name,
3074 		      spdk_get_thread());
3075 
3076 	pthread_mutex_lock(&bdev->internal.mutex);
3077 
3078 	TAILQ_REMOVE(&bdev->internal.open_descs, desc, link);
3079 	free(desc);
3080 
3081 	/* If no more descriptors, kill QoS channel */
3082 	if (bdev->internal.qos && TAILQ_EMPTY(&bdev->internal.open_descs)) {
3083 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closed last descriptor for bdev %s on thread %p. Stopping QoS.\n",
3084 			      bdev->name, spdk_get_thread());
3085 
3086 		if (spdk_bdev_qos_destroy(bdev)) {
3087 			/* There isn't anything we can do to recover here. Just let the
3088 			 * old QoS poller keep running. The QoS handling won't change
3089 			 * cores when the user allocates a new channel, but it won't break. */
3090 			SPDK_ERRLOG("Unable to shut down QoS poller. It will continue running on the current thread.\n");
3091 		}
3092 	}
3093 
3094 	if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->internal.open_descs)) {
3095 		do_unregister = true;
3096 	}
3097 	pthread_mutex_unlock(&bdev->internal.mutex);
3098 
3099 	if (do_unregister == true) {
3100 		spdk_bdev_unregister(bdev, bdev->internal.unregister_cb, bdev->internal.unregister_ctx);
3101 	}
3102 }
3103 
3104 int
3105 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc,
3106 			    struct spdk_bdev_module *module)
3107 {
3108 	if (bdev->internal.claim_module != NULL) {
3109 		SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name,
3110 			    bdev->internal.claim_module->name);
3111 		return -EPERM;
3112 	}
3113 
3114 	if (desc && !desc->write) {
3115 		desc->write = true;
3116 	}
3117 
3118 	bdev->internal.claim_module = module;
3119 	return 0;
3120 }
3121 
3122 void
3123 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev)
3124 {
3125 	assert(bdev->internal.claim_module != NULL);
3126 	bdev->internal.claim_module = NULL;
3127 }
3128 
3129 struct spdk_bdev *
3130 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc)
3131 {
3132 	return desc->bdev;
3133 }
3134 
3135 void
3136 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp)
3137 {
3138 	struct iovec *iovs;
3139 	int iovcnt;
3140 
3141 	if (bdev_io == NULL) {
3142 		return;
3143 	}
3144 
3145 	switch (bdev_io->type) {
3146 	case SPDK_BDEV_IO_TYPE_READ:
3147 		iovs = bdev_io->u.bdev.iovs;
3148 		iovcnt = bdev_io->u.bdev.iovcnt;
3149 		break;
3150 	case SPDK_BDEV_IO_TYPE_WRITE:
3151 		iovs = bdev_io->u.bdev.iovs;
3152 		iovcnt = bdev_io->u.bdev.iovcnt;
3153 		break;
3154 	default:
3155 		iovs = NULL;
3156 		iovcnt = 0;
3157 		break;
3158 	}
3159 
3160 	if (iovp) {
3161 		*iovp = iovs;
3162 	}
3163 	if (iovcntp) {
3164 		*iovcntp = iovcnt;
3165 	}
3166 }
3167 
3168 void
3169 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module)
3170 {
3171 
3172 	if (spdk_bdev_module_list_find(bdev_module->name)) {
3173 		SPDK_ERRLOG("ERROR: module '%s' already registered.\n", bdev_module->name);
3174 		assert(false);
3175 	}
3176 
3177 	if (bdev_module->async_init) {
3178 		bdev_module->internal.action_in_progress = 1;
3179 	}
3180 
3181 	/*
3182 	 * Modules with examine callbacks must be initialized first, so they are
3183 	 *  ready to handle examine callbacks from later modules that will
3184 	 *  register physical bdevs.
3185 	 */
3186 	if (bdev_module->examine_config != NULL || bdev_module->examine_disk != NULL) {
3187 		TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq);
3188 	} else {
3189 		TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq);
3190 	}
3191 }
3192 
3193 struct spdk_bdev_module *
3194 spdk_bdev_module_list_find(const char *name)
3195 {
3196 	struct spdk_bdev_module *bdev_module;
3197 
3198 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) {
3199 		if (strcmp(name, bdev_module->name) == 0) {
3200 			break;
3201 		}
3202 	}
3203 
3204 	return bdev_module;
3205 }
3206 
3207 static void
3208 spdk_bdev_write_zeroes_split(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
3209 {
3210 	uint64_t len;
3211 
3212 	if (!success) {
3213 		bdev_io->internal.cb = bdev_io->u.bdev.stored_user_cb;
3214 		_spdk_bdev_io_complete(bdev_io);
3215 		return;
3216 	}
3217 
3218 	/* no need to perform the error checking from write_zeroes_blocks because this request already passed those checks. */
3219 	len = spdk_min(spdk_bdev_get_block_size(bdev_io->bdev) * bdev_io->u.bdev.split_remaining_num_blocks,
3220 		       ZERO_BUFFER_SIZE);
3221 
3222 	bdev_io->u.bdev.offset_blocks = bdev_io->u.bdev.split_current_offset_blocks;
3223 	bdev_io->u.bdev.iovs[0].iov_len = len;
3224 	bdev_io->u.bdev.num_blocks = len / spdk_bdev_get_block_size(bdev_io->bdev);
3225 	bdev_io->u.bdev.split_remaining_num_blocks -= bdev_io->u.bdev.num_blocks;
3226 	bdev_io->u.bdev.split_current_offset_blocks += bdev_io->u.bdev.num_blocks;
3227 
3228 	/* if this round completes the i/o, change the callback to be the original user callback */
3229 	if (bdev_io->u.bdev.split_remaining_num_blocks == 0) {
3230 		spdk_bdev_io_init(bdev_io, bdev_io->bdev, cb_arg, bdev_io->u.bdev.stored_user_cb);
3231 	} else {
3232 		spdk_bdev_io_init(bdev_io, bdev_io->bdev, cb_arg, spdk_bdev_write_zeroes_split);
3233 	}
3234 	spdk_bdev_io_submit(bdev_io);
3235 }
3236 
3237 struct set_qos_limit_ctx {
3238 	void (*cb_fn)(void *cb_arg, int status);
3239 	void *cb_arg;
3240 	struct spdk_bdev *bdev;
3241 };
3242 
3243 static void
3244 _spdk_bdev_set_qos_limit_done(struct set_qos_limit_ctx *ctx, int status)
3245 {
3246 	pthread_mutex_lock(&ctx->bdev->internal.mutex);
3247 	ctx->bdev->internal.qos_mod_in_progress = false;
3248 	pthread_mutex_unlock(&ctx->bdev->internal.mutex);
3249 
3250 	ctx->cb_fn(ctx->cb_arg, status);
3251 	free(ctx);
3252 }
3253 
3254 static void
3255 _spdk_bdev_disable_qos_done(void *cb_arg)
3256 {
3257 	struct set_qos_limit_ctx *ctx = cb_arg;
3258 	struct spdk_bdev *bdev = ctx->bdev;
3259 	struct spdk_bdev_io *bdev_io;
3260 	struct spdk_bdev_qos *qos;
3261 
3262 	pthread_mutex_lock(&bdev->internal.mutex);
3263 	qos = bdev->internal.qos;
3264 	bdev->internal.qos = NULL;
3265 	pthread_mutex_unlock(&bdev->internal.mutex);
3266 
3267 	while (!TAILQ_EMPTY(&qos->queued)) {
3268 		/* Send queued I/O back to their original thread for resubmission. */
3269 		bdev_io = TAILQ_FIRST(&qos->queued);
3270 		TAILQ_REMOVE(&qos->queued, bdev_io, internal.link);
3271 
3272 		if (bdev_io->internal.io_submit_ch) {
3273 			/*
3274 			 * Channel was changed when sending it to the QoS thread - change it back
3275 			 *  before sending it back to the original thread.
3276 			 */
3277 			bdev_io->internal.ch = bdev_io->internal.io_submit_ch;
3278 			bdev_io->internal.io_submit_ch = NULL;
3279 		}
3280 
3281 		spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel),
3282 				     _spdk_bdev_io_submit, bdev_io);
3283 	}
3284 
3285 	spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch));
3286 	spdk_poller_unregister(&qos->poller);
3287 
3288 	free(qos);
3289 
3290 	_spdk_bdev_set_qos_limit_done(ctx, 0);
3291 }
3292 
3293 static void
3294 _spdk_bdev_disable_qos_msg_done(struct spdk_io_channel_iter *i, int status)
3295 {
3296 	void *io_device = spdk_io_channel_iter_get_io_device(i);
3297 	struct spdk_bdev *bdev = __bdev_from_io_dev(io_device);
3298 	struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
3299 	struct spdk_thread *thread;
3300 
3301 	pthread_mutex_lock(&bdev->internal.mutex);
3302 	thread = bdev->internal.qos->thread;
3303 	pthread_mutex_unlock(&bdev->internal.mutex);
3304 
3305 	spdk_thread_send_msg(thread, _spdk_bdev_disable_qos_done, ctx);
3306 }
3307 
3308 static void
3309 _spdk_bdev_disable_qos_msg(struct spdk_io_channel_iter *i)
3310 {
3311 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
3312 	struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch);
3313 
3314 	bdev_ch->flags &= ~BDEV_CH_QOS_ENABLED;
3315 
3316 	spdk_for_each_channel_continue(i, 0);
3317 }
3318 
3319 static void
3320 _spdk_bdev_update_qos_limit_iops_msg(void *cb_arg)
3321 {
3322 	struct set_qos_limit_ctx *ctx = cb_arg;
3323 	struct spdk_bdev *bdev = ctx->bdev;
3324 
3325 	pthread_mutex_lock(&bdev->internal.mutex);
3326 	spdk_bdev_qos_update_max_quota_per_timeslice(bdev->internal.qos);
3327 	pthread_mutex_unlock(&bdev->internal.mutex);
3328 
3329 	_spdk_bdev_set_qos_limit_done(ctx, 0);
3330 }
3331 
3332 static void
3333 _spdk_bdev_enable_qos_msg(struct spdk_io_channel_iter *i)
3334 {
3335 	void *io_device = spdk_io_channel_iter_get_io_device(i);
3336 	struct spdk_bdev *bdev = __bdev_from_io_dev(io_device);
3337 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
3338 	struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch);
3339 	int rc;
3340 
3341 	pthread_mutex_lock(&bdev->internal.mutex);
3342 	rc = _spdk_bdev_enable_qos(bdev, bdev_ch);
3343 	pthread_mutex_unlock(&bdev->internal.mutex);
3344 	spdk_for_each_channel_continue(i, rc);
3345 }
3346 
3347 static void
3348 _spdk_bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status)
3349 {
3350 	struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
3351 
3352 	_spdk_bdev_set_qos_limit_done(ctx, status);
3353 }
3354 
3355 void
3356 spdk_bdev_set_qos_limit_iops(struct spdk_bdev *bdev, uint64_t ios_per_sec,
3357 			     void (*cb_fn)(void *cb_arg, int status), void *cb_arg)
3358 {
3359 	struct set_qos_limit_ctx *ctx;
3360 
3361 	if (ios_per_sec > 0 && ios_per_sec % SPDK_BDEV_QOS_MIN_IOS_PER_SEC) {
3362 		SPDK_ERRLOG("Requested ios_per_sec limit %" PRIu64 " is not a multiple of %u\n",
3363 			    ios_per_sec, SPDK_BDEV_QOS_MIN_IOS_PER_SEC);
3364 		cb_fn(cb_arg, -EINVAL);
3365 		return;
3366 	}
3367 
3368 	ctx = calloc(1, sizeof(*ctx));
3369 	if (ctx == NULL) {
3370 		cb_fn(cb_arg, -ENOMEM);
3371 		return;
3372 	}
3373 
3374 	ctx->cb_fn = cb_fn;
3375 	ctx->cb_arg = cb_arg;
3376 	ctx->bdev = bdev;
3377 
3378 	pthread_mutex_lock(&bdev->internal.mutex);
3379 	if (bdev->internal.qos_mod_in_progress) {
3380 		pthread_mutex_unlock(&bdev->internal.mutex);
3381 		free(ctx);
3382 		cb_fn(cb_arg, -EAGAIN);
3383 		return;
3384 	}
3385 	bdev->internal.qos_mod_in_progress = true;
3386 
3387 	if (ios_per_sec > 0) {
3388 		if (bdev->internal.qos == NULL) {
3389 			/* Enabling */
3390 			bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos));
3391 			if (!bdev->internal.qos) {
3392 				pthread_mutex_unlock(&bdev->internal.mutex);
3393 				SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n");
3394 				free(ctx);
3395 				cb_fn(cb_arg, -ENOMEM);
3396 				return;
3397 			}
3398 
3399 			bdev->internal.qos->iops_rate_limit = ios_per_sec;
3400 			spdk_for_each_channel(__bdev_to_io_dev(bdev),
3401 					      _spdk_bdev_enable_qos_msg, ctx,
3402 					      _spdk_bdev_enable_qos_done);
3403 		} else {
3404 			/* Updating */
3405 			bdev->internal.qos->iops_rate_limit = ios_per_sec;
3406 			spdk_thread_send_msg(bdev->internal.qos->thread, _spdk_bdev_update_qos_limit_iops_msg, ctx);
3407 		}
3408 	} else {
3409 		if (bdev->internal.qos != NULL) {
3410 			/* Disabling */
3411 			spdk_for_each_channel(__bdev_to_io_dev(bdev),
3412 					      _spdk_bdev_disable_qos_msg, ctx,
3413 					      _spdk_bdev_disable_qos_msg_done);
3414 		} else {
3415 			pthread_mutex_unlock(&bdev->internal.mutex);
3416 			_spdk_bdev_set_qos_limit_done(ctx, 0);
3417 			return;
3418 		}
3419 	}
3420 
3421 	pthread_mutex_unlock(&bdev->internal.mutex);
3422 }
3423 
3424 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV)
3425