xref: /spdk/lib/bdev/bdev.c (revision 0d2745c94b03b159020b6812c6caddb4922e4449)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (C) 2008-2012 Daisuke Aoyama <aoyama@peach.ne.jp>.
5  *   Copyright (c) Intel Corporation.
6  *   All rights reserved.
7  *
8  *   Redistribution and use in source and binary forms, with or without
9  *   modification, are permitted provided that the following conditions
10  *   are met:
11  *
12  *     * Redistributions of source code must retain the above copyright
13  *       notice, this list of conditions and the following disclaimer.
14  *     * Redistributions in binary form must reproduce the above copyright
15  *       notice, this list of conditions and the following disclaimer in
16  *       the documentation and/or other materials provided with the
17  *       distribution.
18  *     * Neither the name of Intel Corporation nor the names of its
19  *       contributors may be used to endorse or promote products derived
20  *       from this software without specific prior written permission.
21  *
22  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "spdk/stdinc.h"
36 
37 #include "spdk/bdev.h"
38 #include "spdk/conf.h"
39 
40 #include "spdk/env.h"
41 #include "spdk/event.h"
42 #include "spdk/thread.h"
43 #include "spdk/likely.h"
44 #include "spdk/queue.h"
45 #include "spdk/nvme_spec.h"
46 #include "spdk/scsi_spec.h"
47 #include "spdk/util.h"
48 
49 #include "spdk/bdev_module.h"
50 #include "spdk_internal/log.h"
51 #include "spdk/string.h"
52 
53 #ifdef SPDK_CONFIG_VTUNE
54 #include "ittnotify.h"
55 #include "ittnotify_types.h"
56 int __itt_init_ittlib(const char *, __itt_group_id);
57 #endif
58 
59 #define SPDK_BDEV_IO_POOL_SIZE			(64 * 1024)
60 #define SPDK_BDEV_IO_CACHE_SIZE			256
61 #define BUF_SMALL_POOL_SIZE			8192
62 #define BUF_LARGE_POOL_SIZE			1024
63 #define NOMEM_THRESHOLD_COUNT			8
64 #define ZERO_BUFFER_SIZE			0x100000
65 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC		1000
66 #define SPDK_BDEV_SEC_TO_USEC			1000000ULL
67 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE	1
68 #define SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE	512
69 #define SPDK_BDEV_QOS_MIN_IOS_PER_SEC		10000
70 #define SPDK_BDEV_QOS_MIN_BW_IN_MB_PER_SEC	10
71 
72 enum spdk_bdev_qos_type {
73 	SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT = 0,
74 	SPDK_BDEV_QOS_RW_BYTEPS_RATE_LIMIT,
75 	SPDK_BDEV_QOS_NUM_TYPES /* Keep last */
76 };
77 
78 static const char *qos_type_str[SPDK_BDEV_QOS_NUM_TYPES] = {"Limit_IOPS", "Limit_BWPS"};
79 
80 struct spdk_bdev_mgr {
81 	struct spdk_mempool *bdev_io_pool;
82 
83 	struct spdk_mempool *buf_small_pool;
84 	struct spdk_mempool *buf_large_pool;
85 
86 	void *zero_buffer;
87 
88 	TAILQ_HEAD(, spdk_bdev_module) bdev_modules;
89 
90 	TAILQ_HEAD(, spdk_bdev) bdevs;
91 
92 	bool init_complete;
93 	bool module_init_complete;
94 
95 #ifdef SPDK_CONFIG_VTUNE
96 	__itt_domain	*domain;
97 #endif
98 };
99 
100 static struct spdk_bdev_mgr g_bdev_mgr = {
101 	.bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules),
102 	.bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs),
103 	.init_complete = false,
104 	.module_init_complete = false,
105 };
106 
107 static struct spdk_bdev_opts	g_bdev_opts = {
108 	.bdev_io_pool_size = SPDK_BDEV_IO_POOL_SIZE,
109 	.bdev_io_cache_size = SPDK_BDEV_IO_CACHE_SIZE,
110 };
111 
112 static spdk_bdev_init_cb	g_init_cb_fn = NULL;
113 static void			*g_init_cb_arg = NULL;
114 
115 static spdk_bdev_fini_cb	g_fini_cb_fn = NULL;
116 static void			*g_fini_cb_arg = NULL;
117 static struct spdk_thread	*g_fini_thread = NULL;
118 
119 struct spdk_bdev_qos {
120 	/** Rate limit, in I/O per second */
121 	uint64_t iops_rate_limit;
122 
123 	/** Rate limit, in byte per second */
124 	uint64_t byte_rate_limit;
125 
126 	/** The channel that all I/O are funneled through */
127 	struct spdk_bdev_channel *ch;
128 
129 	/** The thread on which the poller is running. */
130 	struct spdk_thread *thread;
131 
132 	/** Queue of I/O waiting to be issued. */
133 	bdev_io_tailq_t queued;
134 
135 	/** Maximum allowed IOs to be issued in one timeslice (e.g., 1ms) and
136 	 *  only valid for the master channel which manages the outstanding IOs. */
137 	uint64_t max_ios_per_timeslice;
138 
139 	/** Maximum allowed bytes to be issued in one timeslice (e.g., 1ms) and
140 	 *  only valid for the master channel which manages the outstanding IOs. */
141 	uint64_t max_byte_per_timeslice;
142 
143 	/** Submitted IO in one timeslice (e.g., 1ms) */
144 	uint64_t io_submitted_this_timeslice;
145 
146 	/** Submitted byte in one timeslice (e.g., 1ms) */
147 	uint64_t byte_submitted_this_timeslice;
148 
149 	/** Polller that processes queued I/O commands each time slice. */
150 	struct spdk_poller *poller;
151 };
152 
153 struct spdk_bdev_mgmt_channel {
154 	bdev_io_stailq_t need_buf_small;
155 	bdev_io_stailq_t need_buf_large;
156 
157 	/*
158 	 * Each thread keeps a cache of bdev_io - this allows
159 	 *  bdev threads which are *not* DPDK threads to still
160 	 *  benefit from a per-thread bdev_io cache.  Without
161 	 *  this, non-DPDK threads fetching from the mempool
162 	 *  incur a cmpxchg on get and put.
163 	 */
164 	bdev_io_stailq_t per_thread_cache;
165 	uint32_t	per_thread_cache_count;
166 	uint32_t	bdev_io_cache_size;
167 
168 	TAILQ_HEAD(, spdk_bdev_shared_resource)	shared_resources;
169 	TAILQ_HEAD(, spdk_bdev_io_wait_entry)	io_wait_queue;
170 };
171 
172 /*
173  * Per-module (or per-io_device) data. Multiple bdevs built on the same io_device
174  * will queue here their IO that awaits retry. It makes it posible to retry sending
175  * IO to one bdev after IO from other bdev completes.
176  */
177 struct spdk_bdev_shared_resource {
178 	/* The bdev management channel */
179 	struct spdk_bdev_mgmt_channel *mgmt_ch;
180 
181 	/*
182 	 * Count of I/O submitted to bdev module and waiting for completion.
183 	 * Incremented before submit_request() is called on an spdk_bdev_io.
184 	 */
185 	uint64_t		io_outstanding;
186 
187 	/*
188 	 * Queue of IO awaiting retry because of a previous NOMEM status returned
189 	 *  on this channel.
190 	 */
191 	bdev_io_tailq_t		nomem_io;
192 
193 	/*
194 	 * Threshold which io_outstanding must drop to before retrying nomem_io.
195 	 */
196 	uint64_t		nomem_threshold;
197 
198 	/* I/O channel allocated by a bdev module */
199 	struct spdk_io_channel	*shared_ch;
200 
201 	/* Refcount of bdev channels using this resource */
202 	uint32_t		ref;
203 
204 	TAILQ_ENTRY(spdk_bdev_shared_resource) link;
205 };
206 
207 #define BDEV_CH_RESET_IN_PROGRESS	(1 << 0)
208 #define BDEV_CH_QOS_ENABLED		(1 << 1)
209 
210 struct spdk_bdev_channel {
211 	struct spdk_bdev	*bdev;
212 
213 	/* The channel for the underlying device */
214 	struct spdk_io_channel	*channel;
215 
216 	/* Per io_device per thread data */
217 	struct spdk_bdev_shared_resource *shared_resource;
218 
219 	struct spdk_bdev_io_stat stat;
220 
221 	/*
222 	 * Count of I/O submitted through this channel and waiting for completion.
223 	 * Incremented before submit_request() is called on an spdk_bdev_io.
224 	 */
225 	uint64_t		io_outstanding;
226 
227 	bdev_io_tailq_t		queued_resets;
228 
229 	uint32_t		flags;
230 
231 #ifdef SPDK_CONFIG_VTUNE
232 	uint64_t		start_tsc;
233 	uint64_t		interval_tsc;
234 	__itt_string_handle	*handle;
235 	struct spdk_bdev_io_stat prev_stat;
236 #endif
237 
238 };
239 
240 struct spdk_bdev_desc {
241 	struct spdk_bdev		*bdev;
242 	spdk_bdev_remove_cb_t		remove_cb;
243 	void				*remove_ctx;
244 	bool				remove_scheduled;
245 	bool				write;
246 	TAILQ_ENTRY(spdk_bdev_desc)	link;
247 };
248 
249 struct spdk_bdev_iostat_ctx {
250 	struct spdk_bdev_io_stat *stat;
251 	spdk_bdev_get_device_stat_cb cb;
252 	void *cb_arg;
253 };
254 
255 #define __bdev_to_io_dev(bdev)		(((char *)bdev) + 1)
256 #define __bdev_from_io_dev(io_dev)	((struct spdk_bdev *)(((char *)io_dev) - 1))
257 
258 static void spdk_bdev_write_zeroes_split(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg);
259 
260 void
261 spdk_bdev_get_opts(struct spdk_bdev_opts *opts)
262 {
263 	*opts = g_bdev_opts;
264 }
265 
266 int
267 spdk_bdev_set_opts(struct spdk_bdev_opts *opts)
268 {
269 	uint32_t min_pool_size;
270 
271 	/*
272 	 * Add 1 to the thread count to account for the extra mgmt_ch that gets created during subsystem
273 	 *  initialization.  A second mgmt_ch will be created on the same thread when the application starts
274 	 *  but before the deferred put_io_channel event is executed for the first mgmt_ch.
275 	 */
276 	min_pool_size = opts->bdev_io_cache_size * (spdk_thread_get_count() + 1);
277 	if (opts->bdev_io_pool_size < min_pool_size) {
278 		SPDK_ERRLOG("bdev_io_pool_size %" PRIu32 " is not compatible with bdev_io_cache_size %" PRIu32
279 			    " and %" PRIu32 " threads\n", opts->bdev_io_pool_size, opts->bdev_io_cache_size,
280 			    spdk_thread_get_count());
281 		SPDK_ERRLOG("bdev_io_pool_size must be at least %" PRIu32 "\n", min_pool_size);
282 		return -1;
283 	}
284 
285 	g_bdev_opts = *opts;
286 	return 0;
287 }
288 
289 struct spdk_bdev *
290 spdk_bdev_first(void)
291 {
292 	struct spdk_bdev *bdev;
293 
294 	bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs);
295 	if (bdev) {
296 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name);
297 	}
298 
299 	return bdev;
300 }
301 
302 struct spdk_bdev *
303 spdk_bdev_next(struct spdk_bdev *prev)
304 {
305 	struct spdk_bdev *bdev;
306 
307 	bdev = TAILQ_NEXT(prev, internal.link);
308 	if (bdev) {
309 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
310 	}
311 
312 	return bdev;
313 }
314 
315 static struct spdk_bdev *
316 _bdev_next_leaf(struct spdk_bdev *bdev)
317 {
318 	while (bdev != NULL) {
319 		if (bdev->internal.claim_module == NULL) {
320 			return bdev;
321 		} else {
322 			bdev = TAILQ_NEXT(bdev, internal.link);
323 		}
324 	}
325 
326 	return bdev;
327 }
328 
329 struct spdk_bdev *
330 spdk_bdev_first_leaf(void)
331 {
332 	struct spdk_bdev *bdev;
333 
334 	bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs));
335 
336 	if (bdev) {
337 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name);
338 	}
339 
340 	return bdev;
341 }
342 
343 struct spdk_bdev *
344 spdk_bdev_next_leaf(struct spdk_bdev *prev)
345 {
346 	struct spdk_bdev *bdev;
347 
348 	bdev = _bdev_next_leaf(TAILQ_NEXT(prev, internal.link));
349 
350 	if (bdev) {
351 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
352 	}
353 
354 	return bdev;
355 }
356 
357 struct spdk_bdev *
358 spdk_bdev_get_by_name(const char *bdev_name)
359 {
360 	struct spdk_bdev_alias *tmp;
361 	struct spdk_bdev *bdev = spdk_bdev_first();
362 
363 	while (bdev != NULL) {
364 		if (strcmp(bdev_name, bdev->name) == 0) {
365 			return bdev;
366 		}
367 
368 		TAILQ_FOREACH(tmp, &bdev->aliases, tailq) {
369 			if (strcmp(bdev_name, tmp->alias) == 0) {
370 				return bdev;
371 			}
372 		}
373 
374 		bdev = spdk_bdev_next(bdev);
375 	}
376 
377 	return NULL;
378 }
379 
380 size_t
381 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len)
382 {
383 	struct iovec **iovs;
384 	int *iovcnt;
385 	void *aligned_buf;
386 
387 	iovs = &bdev_io->u.bdev.iovs;
388 	iovcnt = &bdev_io->u.bdev.iovcnt;
389 
390 	if (*iovs == NULL || *iovcnt == 0) {
391 		*iovs = &bdev_io->iov;
392 		*iovcnt = 1;
393 	}
394 
395 	if (buf != NULL) {
396 		aligned_buf = (void *)(((uintptr_t)buf + 511) & ~511UL);
397 		len = len - ((uintptr_t)aligned_buf - (uintptr_t)buf);
398 	} else {
399 		aligned_buf = NULL;
400 		assert(len == 0);
401 	}
402 
403 	(*iovs)[0].iov_base = aligned_buf;
404 	(*iovs)[0].iov_len = len;
405 
406 	return len;
407 }
408 
409 static void
410 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io)
411 {
412 	struct spdk_mempool *pool;
413 	struct spdk_bdev_io *tmp;
414 	void *buf;
415 	bdev_io_stailq_t *stailq;
416 	struct spdk_bdev_mgmt_channel *ch;
417 	size_t len;
418 
419 	assert(bdev_io->u.bdev.iovcnt == 1);
420 
421 	buf = bdev_io->internal.buf;
422 	ch = bdev_io->internal.ch->shared_resource->mgmt_ch;
423 
424 	if (bdev_io->internal.buf_len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
425 		pool = g_bdev_mgr.buf_small_pool;
426 		stailq = &ch->need_buf_small;
427 		len = SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512;
428 	} else {
429 		pool = g_bdev_mgr.buf_large_pool;
430 		stailq = &ch->need_buf_large;
431 		len = SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512;
432 	}
433 
434 	if (STAILQ_EMPTY(stailq)) {
435 		spdk_mempool_put(pool, buf);
436 	} else {
437 		tmp = STAILQ_FIRST(stailq);
438 		STAILQ_REMOVE_HEAD(stailq, internal.buf_link);
439 		len = spdk_bdev_io_set_buf(tmp, buf, len);
440 		if (len < tmp->internal.buf_len) {
441 			SPDK_ERRLOG("Unable to use buffer due to alignment\n");
442 			spdk_mempool_put(pool, buf);
443 			spdk_bdev_io_set_buf(tmp, NULL, 0);
444 			return;
445 		}
446 		tmp->internal.buf = buf;
447 		tmp->internal.get_buf_cb(tmp->internal.ch->channel, tmp);
448 	}
449 }
450 
451 void
452 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len)
453 {
454 	struct spdk_mempool *pool;
455 	bdev_io_stailq_t *stailq;
456 	void *buf = NULL;
457 	struct spdk_bdev_mgmt_channel *mgmt_ch;
458 	size_t buf_len;
459 
460 	assert(cb != NULL);
461 	assert(bdev_io->u.bdev.iovs != NULL);
462 
463 	if (spdk_unlikely(bdev_io->u.bdev.iovs[0].iov_base != NULL)) {
464 		/* Buffer already present */
465 		cb(bdev_io->internal.ch->channel, bdev_io);
466 		return;
467 	}
468 
469 	assert(len <= SPDK_BDEV_LARGE_BUF_MAX_SIZE);
470 	mgmt_ch = bdev_io->internal.ch->shared_resource->mgmt_ch;
471 
472 	bdev_io->internal.buf_len = len;
473 	bdev_io->internal.get_buf_cb = cb;
474 	if (len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
475 		pool = g_bdev_mgr.buf_small_pool;
476 		stailq = &mgmt_ch->need_buf_small;
477 		buf_len = SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512;
478 	} else {
479 		pool = g_bdev_mgr.buf_large_pool;
480 		stailq = &mgmt_ch->need_buf_large;
481 		buf_len = SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512;
482 	}
483 
484 	buf = spdk_mempool_get(pool);
485 
486 	if (!buf) {
487 		STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link);
488 	} else {
489 		size_t aligned_len;
490 
491 		aligned_len = spdk_bdev_io_set_buf(bdev_io, buf, buf_len);
492 		if (aligned_len < len) {
493 			SPDK_ERRLOG("Unable to use buffer after alignment calculations.\n");
494 			spdk_mempool_put(pool, buf);
495 			spdk_bdev_io_set_buf(bdev_io, NULL, 0);
496 			STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link);
497 			return;
498 		}
499 
500 		bdev_io->internal.buf = buf;
501 		bdev_io->internal.get_buf_cb(bdev_io->internal.ch->channel, bdev_io);
502 	}
503 }
504 
505 static int
506 spdk_bdev_module_get_max_ctx_size(void)
507 {
508 	struct spdk_bdev_module *bdev_module;
509 	int max_bdev_module_size = 0;
510 
511 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) {
512 		if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) {
513 			max_bdev_module_size = bdev_module->get_ctx_size();
514 		}
515 	}
516 
517 	return max_bdev_module_size;
518 }
519 
520 void
521 spdk_bdev_config_text(FILE *fp)
522 {
523 	struct spdk_bdev_module *bdev_module;
524 
525 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) {
526 		if (bdev_module->config_text) {
527 			bdev_module->config_text(fp);
528 		}
529 	}
530 }
531 
532 void
533 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w)
534 {
535 	struct spdk_bdev_module *bdev_module;
536 	struct spdk_bdev *bdev;
537 
538 	assert(w != NULL);
539 
540 	spdk_json_write_array_begin(w);
541 
542 	spdk_json_write_object_begin(w);
543 	spdk_json_write_named_string(w, "method", "set_bdev_options");
544 	spdk_json_write_name(w, "params");
545 	spdk_json_write_object_begin(w);
546 	spdk_json_write_named_uint32(w, "bdev_io_pool_size", g_bdev_opts.bdev_io_pool_size);
547 	spdk_json_write_named_uint32(w, "bdev_io_cache_size", g_bdev_opts.bdev_io_cache_size);
548 	spdk_json_write_object_end(w);
549 	spdk_json_write_object_end(w);
550 
551 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) {
552 		if (bdev_module->config_json) {
553 			bdev_module->config_json(w);
554 		}
555 	}
556 
557 	TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, internal.link) {
558 		spdk_bdev_config_json(bdev, w);
559 	}
560 
561 	spdk_json_write_array_end(w);
562 }
563 
564 static int
565 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf)
566 {
567 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
568 	struct spdk_bdev_io *bdev_io;
569 	uint32_t i;
570 
571 	STAILQ_INIT(&ch->need_buf_small);
572 	STAILQ_INIT(&ch->need_buf_large);
573 
574 	STAILQ_INIT(&ch->per_thread_cache);
575 	ch->bdev_io_cache_size = g_bdev_opts.bdev_io_cache_size;
576 
577 	/* Pre-populate bdev_io cache to ensure this thread cannot be starved. */
578 	ch->per_thread_cache_count = 0;
579 	for (i = 0; i < ch->bdev_io_cache_size; i++) {
580 		bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool);
581 		assert(bdev_io != NULL);
582 		ch->per_thread_cache_count++;
583 		STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link);
584 	}
585 
586 	TAILQ_INIT(&ch->shared_resources);
587 	TAILQ_INIT(&ch->io_wait_queue);
588 
589 	return 0;
590 }
591 
592 static void
593 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf)
594 {
595 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
596 	struct spdk_bdev_io *bdev_io;
597 
598 	if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) {
599 		SPDK_ERRLOG("Pending I/O list wasn't empty on mgmt channel free\n");
600 	}
601 
602 	if (!TAILQ_EMPTY(&ch->shared_resources)) {
603 		SPDK_ERRLOG("Module channel list wasn't empty on mgmt channel free\n");
604 	}
605 
606 	while (!STAILQ_EMPTY(&ch->per_thread_cache)) {
607 		bdev_io = STAILQ_FIRST(&ch->per_thread_cache);
608 		STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link);
609 		ch->per_thread_cache_count--;
610 		spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
611 	}
612 
613 	assert(ch->per_thread_cache_count == 0);
614 }
615 
616 static void
617 spdk_bdev_init_complete(int rc)
618 {
619 	spdk_bdev_init_cb cb_fn = g_init_cb_fn;
620 	void *cb_arg = g_init_cb_arg;
621 	struct spdk_bdev_module *m;
622 
623 	g_bdev_mgr.init_complete = true;
624 	g_init_cb_fn = NULL;
625 	g_init_cb_arg = NULL;
626 
627 	/*
628 	 * For modules that need to know when subsystem init is complete,
629 	 * inform them now.
630 	 */
631 	TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) {
632 		if (m->init_complete) {
633 			m->init_complete();
634 		}
635 	}
636 
637 	cb_fn(cb_arg, rc);
638 }
639 
640 static void
641 spdk_bdev_module_action_complete(void)
642 {
643 	struct spdk_bdev_module *m;
644 
645 	/*
646 	 * Don't finish bdev subsystem initialization if
647 	 * module pre-initialization is still in progress, or
648 	 * the subsystem been already initialized.
649 	 */
650 	if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) {
651 		return;
652 	}
653 
654 	/*
655 	 * Check all bdev modules for inits/examinations in progress. If any
656 	 * exist, return immediately since we cannot finish bdev subsystem
657 	 * initialization until all are completed.
658 	 */
659 	TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) {
660 		if (m->internal.action_in_progress > 0) {
661 			return;
662 		}
663 	}
664 
665 	/*
666 	 * Modules already finished initialization - now that all
667 	 * the bdev modules have finished their asynchronous I/O
668 	 * processing, the entire bdev layer can be marked as complete.
669 	 */
670 	spdk_bdev_init_complete(0);
671 }
672 
673 static void
674 spdk_bdev_module_action_done(struct spdk_bdev_module *module)
675 {
676 	assert(module->internal.action_in_progress > 0);
677 	module->internal.action_in_progress--;
678 	spdk_bdev_module_action_complete();
679 }
680 
681 void
682 spdk_bdev_module_init_done(struct spdk_bdev_module *module)
683 {
684 	spdk_bdev_module_action_done(module);
685 }
686 
687 void
688 spdk_bdev_module_examine_done(struct spdk_bdev_module *module)
689 {
690 	spdk_bdev_module_action_done(module);
691 }
692 
693 static int
694 spdk_bdev_modules_init(void)
695 {
696 	struct spdk_bdev_module *module;
697 	int rc = 0;
698 
699 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) {
700 		rc = module->module_init();
701 		if (rc != 0) {
702 			break;
703 		}
704 	}
705 
706 	g_bdev_mgr.module_init_complete = true;
707 	return rc;
708 }
709 
710 void
711 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg)
712 {
713 	struct spdk_conf_section *sp;
714 	struct spdk_bdev_opts bdev_opts;
715 	int32_t bdev_io_pool_size, bdev_io_cache_size;
716 	int cache_size;
717 	int rc = 0;
718 	char mempool_name[32];
719 
720 	assert(cb_fn != NULL);
721 
722 	sp = spdk_conf_find_section(NULL, "Bdev");
723 	if (sp != NULL) {
724 		spdk_bdev_get_opts(&bdev_opts);
725 
726 		bdev_io_pool_size = spdk_conf_section_get_intval(sp, "BdevIoPoolSize");
727 		if (bdev_io_pool_size >= 0) {
728 			bdev_opts.bdev_io_pool_size = bdev_io_pool_size;
729 		}
730 
731 		bdev_io_cache_size = spdk_conf_section_get_intval(sp, "BdevIoCacheSize");
732 		if (bdev_io_cache_size >= 0) {
733 			bdev_opts.bdev_io_cache_size = bdev_io_cache_size;
734 		}
735 
736 		if (spdk_bdev_set_opts(&bdev_opts)) {
737 			spdk_bdev_init_complete(-1);
738 			return;
739 		}
740 
741 		assert(memcmp(&bdev_opts, &g_bdev_opts, sizeof(bdev_opts)) == 0);
742 	}
743 
744 	g_init_cb_fn = cb_fn;
745 	g_init_cb_arg = cb_arg;
746 
747 	snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid());
748 
749 	g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name,
750 				  g_bdev_opts.bdev_io_pool_size,
751 				  sizeof(struct spdk_bdev_io) +
752 				  spdk_bdev_module_get_max_ctx_size(),
753 				  0,
754 				  SPDK_ENV_SOCKET_ID_ANY);
755 
756 	if (g_bdev_mgr.bdev_io_pool == NULL) {
757 		SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n");
758 		spdk_bdev_init_complete(-1);
759 		return;
760 	}
761 
762 	/**
763 	 * Ensure no more than half of the total buffers end up local caches, by
764 	 *   using spdk_thread_get_count() to determine how many local caches we need
765 	 *   to account for.
766 	 */
767 	cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_thread_get_count());
768 	snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid());
769 
770 	g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name,
771 				    BUF_SMALL_POOL_SIZE,
772 				    SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512,
773 				    cache_size,
774 				    SPDK_ENV_SOCKET_ID_ANY);
775 	if (!g_bdev_mgr.buf_small_pool) {
776 		SPDK_ERRLOG("create rbuf small pool failed\n");
777 		spdk_bdev_init_complete(-1);
778 		return;
779 	}
780 
781 	cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_thread_get_count());
782 	snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid());
783 
784 	g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name,
785 				    BUF_LARGE_POOL_SIZE,
786 				    SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512,
787 				    cache_size,
788 				    SPDK_ENV_SOCKET_ID_ANY);
789 	if (!g_bdev_mgr.buf_large_pool) {
790 		SPDK_ERRLOG("create rbuf large pool failed\n");
791 		spdk_bdev_init_complete(-1);
792 		return;
793 	}
794 
795 	g_bdev_mgr.zero_buffer = spdk_dma_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE,
796 				 NULL);
797 	if (!g_bdev_mgr.zero_buffer) {
798 		SPDK_ERRLOG("create bdev zero buffer failed\n");
799 		spdk_bdev_init_complete(-1);
800 		return;
801 	}
802 
803 #ifdef SPDK_CONFIG_VTUNE
804 	g_bdev_mgr.domain = __itt_domain_create("spdk_bdev");
805 #endif
806 
807 	spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create,
808 				spdk_bdev_mgmt_channel_destroy,
809 				sizeof(struct spdk_bdev_mgmt_channel));
810 
811 	rc = spdk_bdev_modules_init();
812 	if (rc != 0) {
813 		SPDK_ERRLOG("bdev modules init failed\n");
814 		spdk_bdev_init_complete(-1);
815 		return;
816 	}
817 
818 	spdk_bdev_module_action_complete();
819 }
820 
821 static void
822 spdk_bdev_mgr_unregister_cb(void *io_device)
823 {
824 	spdk_bdev_fini_cb cb_fn = g_fini_cb_fn;
825 
826 	if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != g_bdev_opts.bdev_io_pool_size) {
827 		SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n",
828 			    spdk_mempool_count(g_bdev_mgr.bdev_io_pool),
829 			    g_bdev_opts.bdev_io_pool_size);
830 	}
831 
832 	if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) {
833 		SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n",
834 			    spdk_mempool_count(g_bdev_mgr.buf_small_pool),
835 			    BUF_SMALL_POOL_SIZE);
836 		assert(false);
837 	}
838 
839 	if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) {
840 		SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n",
841 			    spdk_mempool_count(g_bdev_mgr.buf_large_pool),
842 			    BUF_LARGE_POOL_SIZE);
843 		assert(false);
844 	}
845 
846 	spdk_mempool_free(g_bdev_mgr.bdev_io_pool);
847 	spdk_mempool_free(g_bdev_mgr.buf_small_pool);
848 	spdk_mempool_free(g_bdev_mgr.buf_large_pool);
849 	spdk_dma_free(g_bdev_mgr.zero_buffer);
850 
851 	cb_fn(g_fini_cb_arg);
852 	g_fini_cb_fn = NULL;
853 	g_fini_cb_arg = NULL;
854 }
855 
856 static struct spdk_bdev_module *g_resume_bdev_module = NULL;
857 
858 static void
859 spdk_bdev_module_finish_iter(void *arg)
860 {
861 	struct spdk_bdev_module *bdev_module;
862 
863 	/* Start iterating from the last touched module */
864 	if (!g_resume_bdev_module) {
865 		bdev_module = TAILQ_FIRST(&g_bdev_mgr.bdev_modules);
866 	} else {
867 		bdev_module = TAILQ_NEXT(g_resume_bdev_module, internal.tailq);
868 	}
869 
870 	while (bdev_module) {
871 		if (bdev_module->async_fini) {
872 			/* Save our place so we can resume later. We must
873 			 * save the variable here, before calling module_fini()
874 			 * below, because in some cases the module may immediately
875 			 * call spdk_bdev_module_finish_done() and re-enter
876 			 * this function to continue iterating. */
877 			g_resume_bdev_module = bdev_module;
878 		}
879 
880 		if (bdev_module->module_fini) {
881 			bdev_module->module_fini();
882 		}
883 
884 		if (bdev_module->async_fini) {
885 			return;
886 		}
887 
888 		bdev_module = TAILQ_NEXT(bdev_module, internal.tailq);
889 	}
890 
891 	g_resume_bdev_module = NULL;
892 	spdk_io_device_unregister(&g_bdev_mgr, spdk_bdev_mgr_unregister_cb);
893 }
894 
895 void
896 spdk_bdev_module_finish_done(void)
897 {
898 	if (spdk_get_thread() != g_fini_thread) {
899 		spdk_thread_send_msg(g_fini_thread, spdk_bdev_module_finish_iter, NULL);
900 	} else {
901 		spdk_bdev_module_finish_iter(NULL);
902 	}
903 }
904 
905 static void
906 _spdk_bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno)
907 {
908 	struct spdk_bdev *bdev = cb_arg;
909 
910 	if (bdeverrno && bdev) {
911 		SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n",
912 			     bdev->name);
913 
914 		/*
915 		 * Since the call to spdk_bdev_unregister() failed, we have no way to free this
916 		 *  bdev; try to continue by manually removing this bdev from the list and continue
917 		 *  with the next bdev in the list.
918 		 */
919 		TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link);
920 	}
921 
922 	if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) {
923 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n");
924 		/*
925 		 * Bdev module finish need to be deffered as we might be in the middle of some context
926 		 * (like bdev part free) that will use this bdev (or private bdev driver ctx data)
927 		 * after returning.
928 		 */
929 		spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_module_finish_iter, NULL);
930 		return;
931 	}
932 
933 	/*
934 	 * Unregister the first bdev in the list.
935 	 *
936 	 * spdk_bdev_unregister() will handle the case where the bdev has open descriptors by
937 	 *  calling the remove_cb of the descriptors first.
938 	 *
939 	 * Once this bdev and all of its open descriptors have been cleaned up, this function
940 	 *  will be called again via the unregister completion callback to continue the cleanup
941 	 *  process with the next bdev.
942 	 */
943 	bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs);
944 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name);
945 	spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev);
946 }
947 
948 void
949 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg)
950 {
951 	assert(cb_fn != NULL);
952 
953 	g_fini_thread = spdk_get_thread();
954 
955 	g_fini_cb_fn = cb_fn;
956 	g_fini_cb_arg = cb_arg;
957 
958 	_spdk_bdev_finish_unregister_bdevs_iter(NULL, 0);
959 }
960 
961 static struct spdk_bdev_io *
962 spdk_bdev_get_io(struct spdk_bdev_channel *channel)
963 {
964 	struct spdk_bdev_mgmt_channel *ch = channel->shared_resource->mgmt_ch;
965 	struct spdk_bdev_io *bdev_io;
966 
967 	if (ch->per_thread_cache_count > 0) {
968 		bdev_io = STAILQ_FIRST(&ch->per_thread_cache);
969 		STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link);
970 		ch->per_thread_cache_count--;
971 	} else if (spdk_unlikely(!TAILQ_EMPTY(&ch->io_wait_queue))) {
972 		/*
973 		 * Don't try to look for bdev_ios in the global pool if there are
974 		 * waiters on bdev_ios - we don't want this caller to jump the line.
975 		 */
976 		bdev_io = NULL;
977 	} else {
978 		bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool);
979 	}
980 
981 	return bdev_io;
982 }
983 
984 void
985 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io)
986 {
987 	struct spdk_bdev_mgmt_channel *ch = bdev_io->internal.ch->shared_resource->mgmt_ch;
988 
989 	assert(bdev_io != NULL);
990 	assert(bdev_io->internal.status != SPDK_BDEV_IO_STATUS_PENDING);
991 
992 	if (bdev_io->internal.buf != NULL) {
993 		spdk_bdev_io_put_buf(bdev_io);
994 	}
995 
996 	if (ch->per_thread_cache_count < ch->bdev_io_cache_size) {
997 		ch->per_thread_cache_count++;
998 		STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link);
999 		while (ch->per_thread_cache_count > 0 && !TAILQ_EMPTY(&ch->io_wait_queue)) {
1000 			struct spdk_bdev_io_wait_entry *entry;
1001 
1002 			entry = TAILQ_FIRST(&ch->io_wait_queue);
1003 			TAILQ_REMOVE(&ch->io_wait_queue, entry, link);
1004 			entry->cb_fn(entry->cb_arg);
1005 		}
1006 	} else {
1007 		/* We should never have a full cache with entries on the io wait queue. */
1008 		assert(TAILQ_EMPTY(&ch->io_wait_queue));
1009 		spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
1010 	}
1011 }
1012 
1013 static uint64_t
1014 _spdk_bdev_get_io_size_in_byte(struct spdk_bdev_io *bdev_io)
1015 {
1016 	struct spdk_bdev	*bdev = bdev_io->bdev;
1017 
1018 	switch (bdev_io->type) {
1019 	case SPDK_BDEV_IO_TYPE_NVME_ADMIN:
1020 	case SPDK_BDEV_IO_TYPE_NVME_IO:
1021 	case SPDK_BDEV_IO_TYPE_NVME_IO_MD:
1022 		return bdev_io->u.nvme_passthru.nbytes;
1023 	case SPDK_BDEV_IO_TYPE_READ:
1024 	case SPDK_BDEV_IO_TYPE_WRITE:
1025 	case SPDK_BDEV_IO_TYPE_UNMAP:
1026 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
1027 		return bdev_io->u.bdev.num_blocks * bdev->blocklen;
1028 	default:
1029 		return 0;
1030 	}
1031 }
1032 
1033 static void
1034 _spdk_bdev_qos_io_submit(struct spdk_bdev_channel *ch)
1035 {
1036 	struct spdk_bdev_io		*bdev_io = NULL;
1037 	struct spdk_bdev		*bdev = ch->bdev;
1038 	struct spdk_bdev_qos		*qos = bdev->internal.qos;
1039 	struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource;
1040 
1041 	while (!TAILQ_EMPTY(&qos->queued)) {
1042 		if (qos->max_ios_per_timeslice > 0 &&
1043 		    qos->io_submitted_this_timeslice >= qos->max_ios_per_timeslice) {
1044 			break;
1045 		}
1046 
1047 		if (qos->max_byte_per_timeslice > 0 &&
1048 		    qos->byte_submitted_this_timeslice >= qos->max_byte_per_timeslice) {
1049 			break;
1050 		}
1051 
1052 		bdev_io = TAILQ_FIRST(&qos->queued);
1053 		TAILQ_REMOVE(&qos->queued, bdev_io, internal.link);
1054 		qos->io_submitted_this_timeslice++;
1055 		qos->byte_submitted_this_timeslice += _spdk_bdev_get_io_size_in_byte(bdev_io);
1056 		ch->io_outstanding++;
1057 		shared_resource->io_outstanding++;
1058 		bdev->fn_table->submit_request(ch->channel, bdev_io);
1059 	}
1060 }
1061 
1062 static void
1063 _spdk_bdev_io_submit(void *ctx)
1064 {
1065 	struct spdk_bdev_io *bdev_io = ctx;
1066 	struct spdk_bdev *bdev = bdev_io->bdev;
1067 	struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch;
1068 	struct spdk_io_channel *ch = bdev_ch->channel;
1069 	struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource;
1070 
1071 	bdev_io->internal.submit_tsc = spdk_get_ticks();
1072 	bdev_ch->io_outstanding++;
1073 	shared_resource->io_outstanding++;
1074 	bdev_io->internal.in_submit_request = true;
1075 	if (spdk_likely(bdev_ch->flags == 0)) {
1076 		if (spdk_likely(TAILQ_EMPTY(&shared_resource->nomem_io))) {
1077 			bdev->fn_table->submit_request(ch, bdev_io);
1078 		} else {
1079 			bdev_ch->io_outstanding--;
1080 			shared_resource->io_outstanding--;
1081 			TAILQ_INSERT_TAIL(&shared_resource->nomem_io, bdev_io, internal.link);
1082 		}
1083 	} else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) {
1084 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1085 	} else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) {
1086 		bdev_ch->io_outstanding--;
1087 		shared_resource->io_outstanding--;
1088 		TAILQ_INSERT_TAIL(&bdev->internal.qos->queued, bdev_io, internal.link);
1089 		_spdk_bdev_qos_io_submit(bdev_ch);
1090 	} else {
1091 		SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags);
1092 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1093 	}
1094 	bdev_io->internal.in_submit_request = false;
1095 }
1096 
1097 static void
1098 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io)
1099 {
1100 	struct spdk_bdev *bdev = bdev_io->bdev;
1101 	struct spdk_thread *thread = spdk_io_channel_get_thread(bdev_io->internal.ch->channel);
1102 
1103 	assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING);
1104 
1105 	if (bdev_io->internal.ch->flags & BDEV_CH_QOS_ENABLED) {
1106 		if (thread == bdev->internal.qos->thread) {
1107 			_spdk_bdev_io_submit(bdev_io);
1108 		} else {
1109 			bdev_io->internal.io_submit_ch = bdev_io->internal.ch;
1110 			bdev_io->internal.ch = bdev->internal.qos->ch;
1111 			spdk_thread_send_msg(bdev->internal.qos->thread, _spdk_bdev_io_submit, bdev_io);
1112 		}
1113 	} else {
1114 		_spdk_bdev_io_submit(bdev_io);
1115 	}
1116 }
1117 
1118 static void
1119 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io)
1120 {
1121 	struct spdk_bdev *bdev = bdev_io->bdev;
1122 	struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch;
1123 	struct spdk_io_channel *ch = bdev_ch->channel;
1124 
1125 	assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING);
1126 
1127 	bdev_io->internal.in_submit_request = true;
1128 	bdev->fn_table->submit_request(ch, bdev_io);
1129 	bdev_io->internal.in_submit_request = false;
1130 }
1131 
1132 static void
1133 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io,
1134 		  struct spdk_bdev *bdev, void *cb_arg,
1135 		  spdk_bdev_io_completion_cb cb)
1136 {
1137 	bdev_io->bdev = bdev;
1138 	bdev_io->internal.caller_ctx = cb_arg;
1139 	bdev_io->internal.cb = cb;
1140 	bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING;
1141 	bdev_io->internal.in_submit_request = false;
1142 	bdev_io->internal.buf = NULL;
1143 	bdev_io->internal.io_submit_ch = NULL;
1144 }
1145 
1146 static bool
1147 _spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type)
1148 {
1149 	return bdev->fn_table->io_type_supported(bdev->ctxt, io_type);
1150 }
1151 
1152 bool
1153 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type)
1154 {
1155 	bool supported;
1156 
1157 	supported = _spdk_bdev_io_type_supported(bdev, io_type);
1158 
1159 	if (!supported) {
1160 		switch (io_type) {
1161 		case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
1162 			/* The bdev layer will emulate write zeroes as long as write is supported. */
1163 			supported = _spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE);
1164 			break;
1165 		default:
1166 			break;
1167 		}
1168 	}
1169 
1170 	return supported;
1171 }
1172 
1173 int
1174 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
1175 {
1176 	if (bdev->fn_table->dump_info_json) {
1177 		return bdev->fn_table->dump_info_json(bdev->ctxt, w);
1178 	}
1179 
1180 	return 0;
1181 }
1182 
1183 void
1184 spdk_bdev_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
1185 {
1186 	assert(bdev != NULL);
1187 	assert(w != NULL);
1188 
1189 	if (bdev->fn_table->write_config_json) {
1190 		bdev->fn_table->write_config_json(bdev, w);
1191 	} else {
1192 		spdk_json_write_object_begin(w);
1193 		spdk_json_write_named_string(w, "name", bdev->name);
1194 		spdk_json_write_object_end(w);
1195 	}
1196 }
1197 
1198 static void
1199 spdk_bdev_qos_update_max_quota_per_timeslice(struct spdk_bdev_qos *qos)
1200 {
1201 	uint64_t max_ios_per_timeslice = 0, max_byte_per_timeslice = 0;
1202 
1203 	if (qos->iops_rate_limit > 0) {
1204 		max_ios_per_timeslice = qos->iops_rate_limit * SPDK_BDEV_QOS_TIMESLICE_IN_USEC /
1205 					SPDK_BDEV_SEC_TO_USEC;
1206 		qos->max_ios_per_timeslice = spdk_max(max_ios_per_timeslice,
1207 						      SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE);
1208 	}
1209 
1210 	if (qos->byte_rate_limit > 0) {
1211 		max_byte_per_timeslice = qos->byte_rate_limit * SPDK_BDEV_QOS_TIMESLICE_IN_USEC /
1212 					 SPDK_BDEV_SEC_TO_USEC;
1213 		qos->max_byte_per_timeslice = spdk_max(max_byte_per_timeslice,
1214 						       SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE);
1215 	}
1216 }
1217 
1218 static int
1219 spdk_bdev_channel_poll_qos(void *arg)
1220 {
1221 	struct spdk_bdev_qos *qos = arg;
1222 
1223 	/* Reset for next round of rate limiting */
1224 	qos->io_submitted_this_timeslice = 0;
1225 	qos->byte_submitted_this_timeslice = 0;
1226 
1227 	_spdk_bdev_qos_io_submit(qos->ch);
1228 
1229 	return -1;
1230 }
1231 
1232 static void
1233 _spdk_bdev_channel_destroy_resource(struct spdk_bdev_channel *ch)
1234 {
1235 	struct spdk_bdev_shared_resource *shared_resource;
1236 
1237 	if (!ch) {
1238 		return;
1239 	}
1240 
1241 	if (ch->channel) {
1242 		spdk_put_io_channel(ch->channel);
1243 	}
1244 
1245 	assert(ch->io_outstanding == 0);
1246 
1247 	shared_resource = ch->shared_resource;
1248 	if (shared_resource) {
1249 		assert(ch->io_outstanding == 0);
1250 		assert(shared_resource->ref > 0);
1251 		shared_resource->ref--;
1252 		if (shared_resource->ref == 0) {
1253 			assert(shared_resource->io_outstanding == 0);
1254 			TAILQ_REMOVE(&shared_resource->mgmt_ch->shared_resources, shared_resource, link);
1255 			spdk_put_io_channel(spdk_io_channel_from_ctx(shared_resource->mgmt_ch));
1256 			free(shared_resource);
1257 		}
1258 	}
1259 }
1260 
1261 /* Caller must hold bdev->internal.mutex. */
1262 static int
1263 _spdk_bdev_enable_qos(struct spdk_bdev *bdev, struct spdk_bdev_channel *ch)
1264 {
1265 	struct spdk_bdev_qos *qos = bdev->internal.qos;
1266 
1267 	/* Rate limiting on this bdev enabled */
1268 	if (qos) {
1269 		if (qos->ch == NULL) {
1270 			struct spdk_io_channel *io_ch;
1271 
1272 			SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Selecting channel %p as QoS channel for bdev %s on thread %p\n", ch,
1273 				      bdev->name, spdk_get_thread());
1274 
1275 			/* No qos channel has been selected, so set one up */
1276 
1277 			/* Take another reference to ch */
1278 			io_ch = spdk_get_io_channel(__bdev_to_io_dev(bdev));
1279 			qos->ch = ch;
1280 
1281 			qos->thread = spdk_io_channel_get_thread(io_ch);
1282 
1283 			TAILQ_INIT(&qos->queued);
1284 			spdk_bdev_qos_update_max_quota_per_timeslice(qos);
1285 			qos->io_submitted_this_timeslice = 0;
1286 			qos->byte_submitted_this_timeslice = 0;
1287 
1288 			qos->poller = spdk_poller_register(spdk_bdev_channel_poll_qos,
1289 							   qos,
1290 							   SPDK_BDEV_QOS_TIMESLICE_IN_USEC);
1291 		}
1292 
1293 		ch->flags |= BDEV_CH_QOS_ENABLED;
1294 	}
1295 
1296 	return 0;
1297 }
1298 
1299 static int
1300 spdk_bdev_channel_create(void *io_device, void *ctx_buf)
1301 {
1302 	struct spdk_bdev		*bdev = __bdev_from_io_dev(io_device);
1303 	struct spdk_bdev_channel	*ch = ctx_buf;
1304 	struct spdk_io_channel		*mgmt_io_ch;
1305 	struct spdk_bdev_mgmt_channel	*mgmt_ch;
1306 	struct spdk_bdev_shared_resource *shared_resource;
1307 
1308 	ch->bdev = bdev;
1309 	ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt);
1310 	if (!ch->channel) {
1311 		return -1;
1312 	}
1313 
1314 	mgmt_io_ch = spdk_get_io_channel(&g_bdev_mgr);
1315 	if (!mgmt_io_ch) {
1316 		return -1;
1317 	}
1318 
1319 	mgmt_ch = spdk_io_channel_get_ctx(mgmt_io_ch);
1320 	TAILQ_FOREACH(shared_resource, &mgmt_ch->shared_resources, link) {
1321 		if (shared_resource->shared_ch == ch->channel) {
1322 			spdk_put_io_channel(mgmt_io_ch);
1323 			shared_resource->ref++;
1324 			break;
1325 		}
1326 	}
1327 
1328 	if (shared_resource == NULL) {
1329 		shared_resource = calloc(1, sizeof(*shared_resource));
1330 		if (shared_resource == NULL) {
1331 			spdk_put_io_channel(mgmt_io_ch);
1332 			return -1;
1333 		}
1334 
1335 		shared_resource->mgmt_ch = mgmt_ch;
1336 		shared_resource->io_outstanding = 0;
1337 		TAILQ_INIT(&shared_resource->nomem_io);
1338 		shared_resource->nomem_threshold = 0;
1339 		shared_resource->shared_ch = ch->channel;
1340 		shared_resource->ref = 1;
1341 		TAILQ_INSERT_TAIL(&mgmt_ch->shared_resources, shared_resource, link);
1342 	}
1343 
1344 	memset(&ch->stat, 0, sizeof(ch->stat));
1345 	ch->stat.ticks_rate = spdk_get_ticks_hz();
1346 	ch->io_outstanding = 0;
1347 	TAILQ_INIT(&ch->queued_resets);
1348 	ch->flags = 0;
1349 	ch->shared_resource = shared_resource;
1350 
1351 #ifdef SPDK_CONFIG_VTUNE
1352 	{
1353 		char *name;
1354 		__itt_init_ittlib(NULL, 0);
1355 		name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch);
1356 		if (!name) {
1357 			_spdk_bdev_channel_destroy_resource(ch);
1358 			return -1;
1359 		}
1360 		ch->handle = __itt_string_handle_create(name);
1361 		free(name);
1362 		ch->start_tsc = spdk_get_ticks();
1363 		ch->interval_tsc = spdk_get_ticks_hz() / 100;
1364 		memset(&ch->prev_stat, 0, sizeof(ch->prev_stat));
1365 	}
1366 #endif
1367 
1368 	pthread_mutex_lock(&bdev->internal.mutex);
1369 
1370 	if (_spdk_bdev_enable_qos(bdev, ch)) {
1371 		_spdk_bdev_channel_destroy_resource(ch);
1372 		pthread_mutex_unlock(&bdev->internal.mutex);
1373 		return -1;
1374 	}
1375 
1376 	pthread_mutex_unlock(&bdev->internal.mutex);
1377 
1378 	return 0;
1379 }
1380 
1381 /*
1382  * Abort I/O that are waiting on a data buffer.  These types of I/O are
1383  *  linked using the spdk_bdev_io internal.buf_link TAILQ_ENTRY.
1384  */
1385 static void
1386 _spdk_bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch)
1387 {
1388 	bdev_io_stailq_t tmp;
1389 	struct spdk_bdev_io *bdev_io;
1390 
1391 	STAILQ_INIT(&tmp);
1392 
1393 	while (!STAILQ_EMPTY(queue)) {
1394 		bdev_io = STAILQ_FIRST(queue);
1395 		STAILQ_REMOVE_HEAD(queue, internal.buf_link);
1396 		if (bdev_io->internal.ch == ch) {
1397 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1398 		} else {
1399 			STAILQ_INSERT_TAIL(&tmp, bdev_io, internal.buf_link);
1400 		}
1401 	}
1402 
1403 	STAILQ_SWAP(&tmp, queue, spdk_bdev_io);
1404 }
1405 
1406 /*
1407  * Abort I/O that are queued waiting for submission.  These types of I/O are
1408  *  linked using the spdk_bdev_io link TAILQ_ENTRY.
1409  */
1410 static void
1411 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch)
1412 {
1413 	struct spdk_bdev_io *bdev_io, *tmp;
1414 
1415 	TAILQ_FOREACH_SAFE(bdev_io, queue, internal.link, tmp) {
1416 		if (bdev_io->internal.ch == ch) {
1417 			TAILQ_REMOVE(queue, bdev_io, internal.link);
1418 			/*
1419 			 * spdk_bdev_io_complete() assumes that the completed I/O had
1420 			 *  been submitted to the bdev module.  Since in this case it
1421 			 *  hadn't, bump io_outstanding to account for the decrement
1422 			 *  that spdk_bdev_io_complete() will do.
1423 			 */
1424 			if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) {
1425 				ch->io_outstanding++;
1426 				ch->shared_resource->io_outstanding++;
1427 			}
1428 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1429 		}
1430 	}
1431 }
1432 
1433 static void
1434 spdk_bdev_qos_channel_destroy(void *cb_arg)
1435 {
1436 	struct spdk_bdev_qos *qos = cb_arg;
1437 
1438 	spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch));
1439 	spdk_poller_unregister(&qos->poller);
1440 
1441 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Free QoS %p.\n", qos);
1442 
1443 	free(qos);
1444 }
1445 
1446 static int
1447 spdk_bdev_qos_destroy(struct spdk_bdev *bdev)
1448 {
1449 	/*
1450 	 * Cleanly shutting down the QoS poller is tricky, because
1451 	 * during the asynchronous operation the user could open
1452 	 * a new descriptor and create a new channel, spawning
1453 	 * a new QoS poller.
1454 	 *
1455 	 * The strategy is to create a new QoS structure here and swap it
1456 	 * in. The shutdown path then continues to refer to the old one
1457 	 * until it completes and then releases it.
1458 	 */
1459 	struct spdk_bdev_qos *new_qos, *old_qos;
1460 
1461 	old_qos = bdev->internal.qos;
1462 
1463 	new_qos = calloc(1, sizeof(*new_qos));
1464 	if (!new_qos) {
1465 		SPDK_ERRLOG("Unable to allocate memory to shut down QoS.\n");
1466 		return -ENOMEM;
1467 	}
1468 
1469 	/* Copy the old QoS data into the newly allocated structure */
1470 	memcpy(new_qos, old_qos, sizeof(*new_qos));
1471 
1472 	/* Zero out the key parts of the QoS structure */
1473 	new_qos->ch = NULL;
1474 	new_qos->thread = NULL;
1475 	new_qos->max_ios_per_timeslice = 0;
1476 	new_qos->max_byte_per_timeslice = 0;
1477 	new_qos->io_submitted_this_timeslice = 0;
1478 	new_qos->byte_submitted_this_timeslice = 0;
1479 	new_qos->poller = NULL;
1480 	TAILQ_INIT(&new_qos->queued);
1481 
1482 	bdev->internal.qos = new_qos;
1483 
1484 	spdk_thread_send_msg(old_qos->thread, spdk_bdev_qos_channel_destroy,
1485 			     old_qos);
1486 
1487 	/* It is safe to continue with destroying the bdev even though the QoS channel hasn't
1488 	 * been destroyed yet. The destruction path will end up waiting for the final
1489 	 * channel to be put before it releases resources. */
1490 
1491 	return 0;
1492 }
1493 
1494 static void
1495 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf)
1496 {
1497 	struct spdk_bdev_channel	*ch = ctx_buf;
1498 	struct spdk_bdev_mgmt_channel	*mgmt_ch;
1499 	struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource;
1500 
1501 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Destroying channel %p for bdev %s on thread %p\n", ch, ch->bdev->name,
1502 		      spdk_get_thread());
1503 
1504 	mgmt_ch = shared_resource->mgmt_ch;
1505 
1506 	_spdk_bdev_abort_queued_io(&ch->queued_resets, ch);
1507 	_spdk_bdev_abort_queued_io(&shared_resource->nomem_io, ch);
1508 	_spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_small, ch);
1509 	_spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_large, ch);
1510 
1511 	_spdk_bdev_channel_destroy_resource(ch);
1512 }
1513 
1514 int
1515 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias)
1516 {
1517 	struct spdk_bdev_alias *tmp;
1518 
1519 	if (alias == NULL) {
1520 		SPDK_ERRLOG("Empty alias passed\n");
1521 		return -EINVAL;
1522 	}
1523 
1524 	if (spdk_bdev_get_by_name(alias)) {
1525 		SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias);
1526 		return -EEXIST;
1527 	}
1528 
1529 	tmp = calloc(1, sizeof(*tmp));
1530 	if (tmp == NULL) {
1531 		SPDK_ERRLOG("Unable to allocate alias\n");
1532 		return -ENOMEM;
1533 	}
1534 
1535 	tmp->alias = strdup(alias);
1536 	if (tmp->alias == NULL) {
1537 		free(tmp);
1538 		SPDK_ERRLOG("Unable to allocate alias\n");
1539 		return -ENOMEM;
1540 	}
1541 
1542 	TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq);
1543 
1544 	return 0;
1545 }
1546 
1547 int
1548 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias)
1549 {
1550 	struct spdk_bdev_alias *tmp;
1551 
1552 	TAILQ_FOREACH(tmp, &bdev->aliases, tailq) {
1553 		if (strcmp(alias, tmp->alias) == 0) {
1554 			TAILQ_REMOVE(&bdev->aliases, tmp, tailq);
1555 			free(tmp->alias);
1556 			free(tmp);
1557 			return 0;
1558 		}
1559 	}
1560 
1561 	SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias);
1562 
1563 	return -ENOENT;
1564 }
1565 
1566 struct spdk_io_channel *
1567 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc)
1568 {
1569 	return spdk_get_io_channel(__bdev_to_io_dev(desc->bdev));
1570 }
1571 
1572 const char *
1573 spdk_bdev_get_name(const struct spdk_bdev *bdev)
1574 {
1575 	return bdev->name;
1576 }
1577 
1578 const char *
1579 spdk_bdev_get_product_name(const struct spdk_bdev *bdev)
1580 {
1581 	return bdev->product_name;
1582 }
1583 
1584 const struct spdk_bdev_aliases_list *
1585 spdk_bdev_get_aliases(const struct spdk_bdev *bdev)
1586 {
1587 	return &bdev->aliases;
1588 }
1589 
1590 uint32_t
1591 spdk_bdev_get_block_size(const struct spdk_bdev *bdev)
1592 {
1593 	return bdev->blocklen;
1594 }
1595 
1596 uint64_t
1597 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev)
1598 {
1599 	return bdev->blockcnt;
1600 }
1601 
1602 uint64_t
1603 spdk_bdev_get_qos_ios_per_sec(struct spdk_bdev *bdev)
1604 {
1605 	uint64_t iops_rate_limit = 0;
1606 
1607 	pthread_mutex_lock(&bdev->internal.mutex);
1608 	if (bdev->internal.qos) {
1609 		iops_rate_limit = bdev->internal.qos->iops_rate_limit;
1610 	}
1611 	pthread_mutex_unlock(&bdev->internal.mutex);
1612 
1613 	return iops_rate_limit;
1614 }
1615 
1616 size_t
1617 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev)
1618 {
1619 	/* TODO: push this logic down to the bdev modules */
1620 	if (bdev->need_aligned_buffer) {
1621 		return bdev->blocklen;
1622 	}
1623 
1624 	return 1;
1625 }
1626 
1627 uint32_t
1628 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev)
1629 {
1630 	return bdev->optimal_io_boundary;
1631 }
1632 
1633 bool
1634 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev)
1635 {
1636 	return bdev->write_cache;
1637 }
1638 
1639 const struct spdk_uuid *
1640 spdk_bdev_get_uuid(const struct spdk_bdev *bdev)
1641 {
1642 	return &bdev->uuid;
1643 }
1644 
1645 uint64_t
1646 spdk_bdev_get_qd(const struct spdk_bdev *bdev)
1647 {
1648 	return bdev->internal.measured_queue_depth;
1649 }
1650 
1651 uint64_t
1652 spdk_bdev_get_qd_sampling_period(const struct spdk_bdev *bdev)
1653 {
1654 	return bdev->internal.period;
1655 }
1656 
1657 static void
1658 _calculate_measured_qd_cpl(struct spdk_io_channel_iter *i, int status)
1659 {
1660 	struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i);
1661 
1662 	bdev->internal.measured_queue_depth = bdev->internal.temporary_queue_depth;
1663 }
1664 
1665 static void
1666 _calculate_measured_qd(struct spdk_io_channel_iter *i)
1667 {
1668 	struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i);
1669 	struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i);
1670 	struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(io_ch);
1671 
1672 	bdev->internal.temporary_queue_depth += ch->io_outstanding;
1673 	spdk_for_each_channel_continue(i, 0);
1674 }
1675 
1676 static int
1677 spdk_bdev_calculate_measured_queue_depth(void *ctx)
1678 {
1679 	struct spdk_bdev *bdev = ctx;
1680 	bdev->internal.temporary_queue_depth = 0;
1681 	spdk_for_each_channel(__bdev_to_io_dev(bdev), _calculate_measured_qd, bdev,
1682 			      _calculate_measured_qd_cpl);
1683 	return 0;
1684 }
1685 
1686 void
1687 spdk_bdev_set_qd_sampling_period(struct spdk_bdev *bdev, uint64_t period)
1688 {
1689 	bdev->internal.period = period;
1690 
1691 	if (bdev->internal.qd_poller != NULL) {
1692 		spdk_poller_unregister(&bdev->internal.qd_poller);
1693 		bdev->internal.measured_queue_depth = UINT64_MAX;
1694 	}
1695 
1696 	if (period != 0) {
1697 		bdev->internal.qd_poller = spdk_poller_register(spdk_bdev_calculate_measured_queue_depth, bdev,
1698 					   period);
1699 	}
1700 }
1701 
1702 int
1703 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size)
1704 {
1705 	int ret;
1706 
1707 	pthread_mutex_lock(&bdev->internal.mutex);
1708 
1709 	/* bdev has open descriptors */
1710 	if (!TAILQ_EMPTY(&bdev->internal.open_descs) &&
1711 	    bdev->blockcnt > size) {
1712 		ret = -EBUSY;
1713 	} else {
1714 		bdev->blockcnt = size;
1715 		ret = 0;
1716 	}
1717 
1718 	pthread_mutex_unlock(&bdev->internal.mutex);
1719 
1720 	return ret;
1721 }
1722 
1723 /*
1724  * Convert I/O offset and length from bytes to blocks.
1725  *
1726  * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size.
1727  */
1728 static uint64_t
1729 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks,
1730 			  uint64_t num_bytes, uint64_t *num_blocks)
1731 {
1732 	uint32_t block_size = bdev->blocklen;
1733 
1734 	*offset_blocks = offset_bytes / block_size;
1735 	*num_blocks = num_bytes / block_size;
1736 
1737 	return (offset_bytes % block_size) | (num_bytes % block_size);
1738 }
1739 
1740 static bool
1741 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks)
1742 {
1743 	/* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there
1744 	 * has been an overflow and hence the offset has been wrapped around */
1745 	if (offset_blocks + num_blocks < offset_blocks) {
1746 		return false;
1747 	}
1748 
1749 	/* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */
1750 	if (offset_blocks + num_blocks > bdev->blockcnt) {
1751 		return false;
1752 	}
1753 
1754 	return true;
1755 }
1756 
1757 int
1758 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1759 	       void *buf, uint64_t offset, uint64_t nbytes,
1760 	       spdk_bdev_io_completion_cb cb, void *cb_arg)
1761 {
1762 	uint64_t offset_blocks, num_blocks;
1763 
1764 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1765 		return -EINVAL;
1766 	}
1767 
1768 	return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
1769 }
1770 
1771 int
1772 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1773 		      void *buf, uint64_t offset_blocks, uint64_t num_blocks,
1774 		      spdk_bdev_io_completion_cb cb, void *cb_arg)
1775 {
1776 	struct spdk_bdev *bdev = desc->bdev;
1777 	struct spdk_bdev_io *bdev_io;
1778 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1779 
1780 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1781 		return -EINVAL;
1782 	}
1783 
1784 	bdev_io = spdk_bdev_get_io(channel);
1785 	if (!bdev_io) {
1786 		return -ENOMEM;
1787 	}
1788 
1789 	bdev_io->internal.ch = channel;
1790 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
1791 	bdev_io->u.bdev.iovs = &bdev_io->iov;
1792 	bdev_io->u.bdev.iovs[0].iov_base = buf;
1793 	bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen;
1794 	bdev_io->u.bdev.iovcnt = 1;
1795 	bdev_io->u.bdev.num_blocks = num_blocks;
1796 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1797 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1798 
1799 	spdk_bdev_io_submit(bdev_io);
1800 	return 0;
1801 }
1802 
1803 int
1804 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1805 		struct iovec *iov, int iovcnt,
1806 		uint64_t offset, uint64_t nbytes,
1807 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1808 {
1809 	uint64_t offset_blocks, num_blocks;
1810 
1811 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1812 		return -EINVAL;
1813 	}
1814 
1815 	return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
1816 }
1817 
1818 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1819 			   struct iovec *iov, int iovcnt,
1820 			   uint64_t offset_blocks, uint64_t num_blocks,
1821 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
1822 {
1823 	struct spdk_bdev *bdev = desc->bdev;
1824 	struct spdk_bdev_io *bdev_io;
1825 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1826 
1827 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1828 		return -EINVAL;
1829 	}
1830 
1831 	bdev_io = spdk_bdev_get_io(channel);
1832 	if (!bdev_io) {
1833 		return -ENOMEM;
1834 	}
1835 
1836 	bdev_io->internal.ch = channel;
1837 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
1838 	bdev_io->u.bdev.iovs = iov;
1839 	bdev_io->u.bdev.iovcnt = iovcnt;
1840 	bdev_io->u.bdev.num_blocks = num_blocks;
1841 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1842 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1843 
1844 	spdk_bdev_io_submit(bdev_io);
1845 	return 0;
1846 }
1847 
1848 int
1849 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1850 		void *buf, uint64_t offset, uint64_t nbytes,
1851 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1852 {
1853 	uint64_t offset_blocks, num_blocks;
1854 
1855 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1856 		return -EINVAL;
1857 	}
1858 
1859 	return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
1860 }
1861 
1862 int
1863 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1864 		       void *buf, uint64_t offset_blocks, uint64_t num_blocks,
1865 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1866 {
1867 	struct spdk_bdev *bdev = desc->bdev;
1868 	struct spdk_bdev_io *bdev_io;
1869 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1870 
1871 	if (!desc->write) {
1872 		return -EBADF;
1873 	}
1874 
1875 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1876 		return -EINVAL;
1877 	}
1878 
1879 	bdev_io = spdk_bdev_get_io(channel);
1880 	if (!bdev_io) {
1881 		return -ENOMEM;
1882 	}
1883 
1884 	bdev_io->internal.ch = channel;
1885 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1886 	bdev_io->u.bdev.iovs = &bdev_io->iov;
1887 	bdev_io->u.bdev.iovs[0].iov_base = buf;
1888 	bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen;
1889 	bdev_io->u.bdev.iovcnt = 1;
1890 	bdev_io->u.bdev.num_blocks = num_blocks;
1891 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1892 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1893 
1894 	spdk_bdev_io_submit(bdev_io);
1895 	return 0;
1896 }
1897 
1898 int
1899 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1900 		 struct iovec *iov, int iovcnt,
1901 		 uint64_t offset, uint64_t len,
1902 		 spdk_bdev_io_completion_cb cb, void *cb_arg)
1903 {
1904 	uint64_t offset_blocks, num_blocks;
1905 
1906 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
1907 		return -EINVAL;
1908 	}
1909 
1910 	return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
1911 }
1912 
1913 int
1914 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1915 			struct iovec *iov, int iovcnt,
1916 			uint64_t offset_blocks, uint64_t num_blocks,
1917 			spdk_bdev_io_completion_cb cb, void *cb_arg)
1918 {
1919 	struct spdk_bdev *bdev = desc->bdev;
1920 	struct spdk_bdev_io *bdev_io;
1921 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1922 
1923 	if (!desc->write) {
1924 		return -EBADF;
1925 	}
1926 
1927 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1928 		return -EINVAL;
1929 	}
1930 
1931 	bdev_io = spdk_bdev_get_io(channel);
1932 	if (!bdev_io) {
1933 		return -ENOMEM;
1934 	}
1935 
1936 	bdev_io->internal.ch = channel;
1937 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1938 	bdev_io->u.bdev.iovs = iov;
1939 	bdev_io->u.bdev.iovcnt = iovcnt;
1940 	bdev_io->u.bdev.num_blocks = num_blocks;
1941 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1942 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1943 
1944 	spdk_bdev_io_submit(bdev_io);
1945 	return 0;
1946 }
1947 
1948 int
1949 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1950 		       uint64_t offset, uint64_t len,
1951 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1952 {
1953 	uint64_t offset_blocks, num_blocks;
1954 
1955 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
1956 		return -EINVAL;
1957 	}
1958 
1959 	return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1960 }
1961 
1962 int
1963 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1964 			      uint64_t offset_blocks, uint64_t num_blocks,
1965 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1966 {
1967 	struct spdk_bdev *bdev = desc->bdev;
1968 	struct spdk_bdev_io *bdev_io;
1969 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1970 	uint64_t len;
1971 	bool split_request = false;
1972 
1973 	if (!desc->write) {
1974 		return -EBADF;
1975 	}
1976 
1977 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1978 		return -EINVAL;
1979 	}
1980 
1981 	bdev_io = spdk_bdev_get_io(channel);
1982 
1983 	if (!bdev_io) {
1984 		return -ENOMEM;
1985 	}
1986 
1987 	bdev_io->internal.ch = channel;
1988 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1989 
1990 	if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
1991 		bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES;
1992 		bdev_io->u.bdev.num_blocks = num_blocks;
1993 		bdev_io->u.bdev.iovs = NULL;
1994 		bdev_io->u.bdev.iovcnt = 0;
1995 
1996 	} else if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)) {
1997 		assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE);
1998 
1999 		len = spdk_bdev_get_block_size(bdev) * num_blocks;
2000 
2001 		if (len > ZERO_BUFFER_SIZE) {
2002 			split_request = true;
2003 			len = ZERO_BUFFER_SIZE;
2004 		}
2005 
2006 		bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
2007 		bdev_io->u.bdev.iovs = &bdev_io->iov;
2008 		bdev_io->u.bdev.iovs[0].iov_base = g_bdev_mgr.zero_buffer;
2009 		bdev_io->u.bdev.iovs[0].iov_len = len;
2010 		bdev_io->u.bdev.iovcnt = 1;
2011 		bdev_io->u.bdev.num_blocks = len / spdk_bdev_get_block_size(bdev);
2012 		bdev_io->u.bdev.split_remaining_num_blocks = num_blocks - bdev_io->u.bdev.num_blocks;
2013 		bdev_io->u.bdev.split_current_offset_blocks = offset_blocks + bdev_io->u.bdev.num_blocks;
2014 	} else {
2015 		spdk_bdev_free_io(bdev_io);
2016 		return -ENOTSUP;
2017 	}
2018 
2019 	if (split_request) {
2020 		bdev_io->u.bdev.stored_user_cb = cb;
2021 		spdk_bdev_io_init(bdev_io, bdev, cb_arg, spdk_bdev_write_zeroes_split);
2022 	} else {
2023 		spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2024 	}
2025 	spdk_bdev_io_submit(bdev_io);
2026 	return 0;
2027 }
2028 
2029 int
2030 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2031 		uint64_t offset, uint64_t nbytes,
2032 		spdk_bdev_io_completion_cb cb, void *cb_arg)
2033 {
2034 	uint64_t offset_blocks, num_blocks;
2035 
2036 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
2037 		return -EINVAL;
2038 	}
2039 
2040 	return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
2041 }
2042 
2043 int
2044 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2045 		       uint64_t offset_blocks, uint64_t num_blocks,
2046 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
2047 {
2048 	struct spdk_bdev *bdev = desc->bdev;
2049 	struct spdk_bdev_io *bdev_io;
2050 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2051 
2052 	if (!desc->write) {
2053 		return -EBADF;
2054 	}
2055 
2056 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2057 		return -EINVAL;
2058 	}
2059 
2060 	if (num_blocks == 0) {
2061 		SPDK_ERRLOG("Can't unmap 0 bytes\n");
2062 		return -EINVAL;
2063 	}
2064 
2065 	bdev_io = spdk_bdev_get_io(channel);
2066 	if (!bdev_io) {
2067 		return -ENOMEM;
2068 	}
2069 
2070 	bdev_io->internal.ch = channel;
2071 	bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP;
2072 
2073 	bdev_io->u.bdev.iovs = &bdev_io->iov;
2074 	bdev_io->u.bdev.iovs[0].iov_base = NULL;
2075 	bdev_io->u.bdev.iovs[0].iov_len = 0;
2076 	bdev_io->u.bdev.iovcnt = 1;
2077 
2078 	bdev_io->u.bdev.offset_blocks = offset_blocks;
2079 	bdev_io->u.bdev.num_blocks = num_blocks;
2080 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2081 
2082 	spdk_bdev_io_submit(bdev_io);
2083 	return 0;
2084 }
2085 
2086 int
2087 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2088 		uint64_t offset, uint64_t length,
2089 		spdk_bdev_io_completion_cb cb, void *cb_arg)
2090 {
2091 	uint64_t offset_blocks, num_blocks;
2092 
2093 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) {
2094 		return -EINVAL;
2095 	}
2096 
2097 	return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
2098 }
2099 
2100 int
2101 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2102 		       uint64_t offset_blocks, uint64_t num_blocks,
2103 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
2104 {
2105 	struct spdk_bdev *bdev = desc->bdev;
2106 	struct spdk_bdev_io *bdev_io;
2107 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2108 
2109 	if (!desc->write) {
2110 		return -EBADF;
2111 	}
2112 
2113 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2114 		return -EINVAL;
2115 	}
2116 
2117 	bdev_io = spdk_bdev_get_io(channel);
2118 	if (!bdev_io) {
2119 		return -ENOMEM;
2120 	}
2121 
2122 	bdev_io->internal.ch = channel;
2123 	bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH;
2124 	bdev_io->u.bdev.iovs = NULL;
2125 	bdev_io->u.bdev.iovcnt = 0;
2126 	bdev_io->u.bdev.offset_blocks = offset_blocks;
2127 	bdev_io->u.bdev.num_blocks = num_blocks;
2128 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2129 
2130 	spdk_bdev_io_submit(bdev_io);
2131 	return 0;
2132 }
2133 
2134 static void
2135 _spdk_bdev_reset_dev(struct spdk_io_channel_iter *i, int status)
2136 {
2137 	struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i);
2138 	struct spdk_bdev_io *bdev_io;
2139 
2140 	bdev_io = TAILQ_FIRST(&ch->queued_resets);
2141 	TAILQ_REMOVE(&ch->queued_resets, bdev_io, internal.link);
2142 	spdk_bdev_io_submit_reset(bdev_io);
2143 }
2144 
2145 static void
2146 _spdk_bdev_reset_freeze_channel(struct spdk_io_channel_iter *i)
2147 {
2148 	struct spdk_io_channel		*ch;
2149 	struct spdk_bdev_channel	*channel;
2150 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
2151 	struct spdk_bdev_shared_resource *shared_resource;
2152 	bdev_io_tailq_t			tmp_queued;
2153 
2154 	TAILQ_INIT(&tmp_queued);
2155 
2156 	ch = spdk_io_channel_iter_get_channel(i);
2157 	channel = spdk_io_channel_get_ctx(ch);
2158 	shared_resource = channel->shared_resource;
2159 	mgmt_channel = shared_resource->mgmt_ch;
2160 
2161 	channel->flags |= BDEV_CH_RESET_IN_PROGRESS;
2162 
2163 	if ((channel->flags & BDEV_CH_QOS_ENABLED) != 0) {
2164 		/* The QoS object is always valid and readable while
2165 		 * the channel flag is set, so the lock here should not
2166 		 * be necessary. We're not in the fast path though, so
2167 		 * just take it anyway. */
2168 		pthread_mutex_lock(&channel->bdev->internal.mutex);
2169 		if (channel->bdev->internal.qos->ch == channel) {
2170 			TAILQ_SWAP(&channel->bdev->internal.qos->queued, &tmp_queued, spdk_bdev_io, internal.link);
2171 		}
2172 		pthread_mutex_unlock(&channel->bdev->internal.mutex);
2173 	}
2174 
2175 	_spdk_bdev_abort_queued_io(&shared_resource->nomem_io, channel);
2176 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel);
2177 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel);
2178 	_spdk_bdev_abort_queued_io(&tmp_queued, channel);
2179 
2180 	spdk_for_each_channel_continue(i, 0);
2181 }
2182 
2183 static void
2184 _spdk_bdev_start_reset(void *ctx)
2185 {
2186 	struct spdk_bdev_channel *ch = ctx;
2187 
2188 	spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), _spdk_bdev_reset_freeze_channel,
2189 			      ch, _spdk_bdev_reset_dev);
2190 }
2191 
2192 static void
2193 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch)
2194 {
2195 	struct spdk_bdev *bdev = ch->bdev;
2196 
2197 	assert(!TAILQ_EMPTY(&ch->queued_resets));
2198 
2199 	pthread_mutex_lock(&bdev->internal.mutex);
2200 	if (bdev->internal.reset_in_progress == NULL) {
2201 		bdev->internal.reset_in_progress = TAILQ_FIRST(&ch->queued_resets);
2202 		/*
2203 		 * Take a channel reference for the target bdev for the life of this
2204 		 *  reset.  This guards against the channel getting destroyed while
2205 		 *  spdk_for_each_channel() calls related to this reset IO are in
2206 		 *  progress.  We will release the reference when this reset is
2207 		 *  completed.
2208 		 */
2209 		bdev->internal.reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev));
2210 		_spdk_bdev_start_reset(ch);
2211 	}
2212 	pthread_mutex_unlock(&bdev->internal.mutex);
2213 }
2214 
2215 int
2216 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2217 		spdk_bdev_io_completion_cb cb, void *cb_arg)
2218 {
2219 	struct spdk_bdev *bdev = desc->bdev;
2220 	struct spdk_bdev_io *bdev_io;
2221 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2222 
2223 	bdev_io = spdk_bdev_get_io(channel);
2224 	if (!bdev_io) {
2225 		return -ENOMEM;
2226 	}
2227 
2228 	bdev_io->internal.ch = channel;
2229 	bdev_io->type = SPDK_BDEV_IO_TYPE_RESET;
2230 	bdev_io->u.reset.ch_ref = NULL;
2231 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2232 
2233 	pthread_mutex_lock(&bdev->internal.mutex);
2234 	TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, internal.link);
2235 	pthread_mutex_unlock(&bdev->internal.mutex);
2236 
2237 	_spdk_bdev_channel_start_reset(channel);
2238 
2239 	return 0;
2240 }
2241 
2242 void
2243 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
2244 		      struct spdk_bdev_io_stat *stat)
2245 {
2246 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2247 
2248 	*stat = channel->stat;
2249 }
2250 
2251 static void
2252 _spdk_bdev_get_device_stat_done(struct spdk_io_channel_iter *i, int status)
2253 {
2254 	void *io_device = spdk_io_channel_iter_get_io_device(i);
2255 	struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i);
2256 
2257 	bdev_iostat_ctx->cb(__bdev_from_io_dev(io_device), bdev_iostat_ctx->stat,
2258 			    bdev_iostat_ctx->cb_arg, 0);
2259 	free(bdev_iostat_ctx);
2260 }
2261 
2262 static void
2263 _spdk_bdev_get_each_channel_stat(struct spdk_io_channel_iter *i)
2264 {
2265 	struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i);
2266 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2267 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2268 
2269 	bdev_iostat_ctx->stat->bytes_read += channel->stat.bytes_read;
2270 	bdev_iostat_ctx->stat->num_read_ops += channel->stat.num_read_ops;
2271 	bdev_iostat_ctx->stat->bytes_written += channel->stat.bytes_written;
2272 	bdev_iostat_ctx->stat->num_write_ops += channel->stat.num_write_ops;
2273 
2274 	spdk_for_each_channel_continue(i, 0);
2275 }
2276 
2277 void
2278 spdk_bdev_get_device_stat(struct spdk_bdev *bdev, struct spdk_bdev_io_stat *stat,
2279 			  spdk_bdev_get_device_stat_cb cb, void *cb_arg)
2280 {
2281 	struct spdk_bdev_iostat_ctx *bdev_iostat_ctx;
2282 
2283 	assert(bdev != NULL);
2284 	assert(stat != NULL);
2285 	assert(cb != NULL);
2286 
2287 	bdev_iostat_ctx = calloc(1, sizeof(struct spdk_bdev_iostat_ctx));
2288 	if (bdev_iostat_ctx == NULL) {
2289 		SPDK_ERRLOG("Unable to allocate memory for spdk_bdev_iostat_ctx\n");
2290 		cb(bdev, stat, cb_arg, -ENOMEM);
2291 		return;
2292 	}
2293 
2294 	bdev_iostat_ctx->stat = stat;
2295 	bdev_iostat_ctx->cb = cb;
2296 	bdev_iostat_ctx->cb_arg = cb_arg;
2297 
2298 	spdk_for_each_channel(__bdev_to_io_dev(bdev),
2299 			      _spdk_bdev_get_each_channel_stat,
2300 			      bdev_iostat_ctx,
2301 			      _spdk_bdev_get_device_stat_done);
2302 }
2303 
2304 int
2305 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2306 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
2307 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
2308 {
2309 	struct spdk_bdev *bdev = desc->bdev;
2310 	struct spdk_bdev_io *bdev_io;
2311 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2312 
2313 	if (!desc->write) {
2314 		return -EBADF;
2315 	}
2316 
2317 	bdev_io = spdk_bdev_get_io(channel);
2318 	if (!bdev_io) {
2319 		return -ENOMEM;
2320 	}
2321 
2322 	bdev_io->internal.ch = channel;
2323 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN;
2324 	bdev_io->u.nvme_passthru.cmd = *cmd;
2325 	bdev_io->u.nvme_passthru.buf = buf;
2326 	bdev_io->u.nvme_passthru.nbytes = nbytes;
2327 	bdev_io->u.nvme_passthru.md_buf = NULL;
2328 	bdev_io->u.nvme_passthru.md_len = 0;
2329 
2330 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2331 
2332 	spdk_bdev_io_submit(bdev_io);
2333 	return 0;
2334 }
2335 
2336 int
2337 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2338 			   const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
2339 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
2340 {
2341 	struct spdk_bdev *bdev = desc->bdev;
2342 	struct spdk_bdev_io *bdev_io;
2343 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2344 
2345 	if (!desc->write) {
2346 		/*
2347 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
2348 		 *  to easily determine if the command is a read or write, but for now just
2349 		 *  do not allow io_passthru with a read-only descriptor.
2350 		 */
2351 		return -EBADF;
2352 	}
2353 
2354 	bdev_io = spdk_bdev_get_io(channel);
2355 	if (!bdev_io) {
2356 		return -ENOMEM;
2357 	}
2358 
2359 	bdev_io->internal.ch = channel;
2360 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO;
2361 	bdev_io->u.nvme_passthru.cmd = *cmd;
2362 	bdev_io->u.nvme_passthru.buf = buf;
2363 	bdev_io->u.nvme_passthru.nbytes = nbytes;
2364 	bdev_io->u.nvme_passthru.md_buf = NULL;
2365 	bdev_io->u.nvme_passthru.md_len = 0;
2366 
2367 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2368 
2369 	spdk_bdev_io_submit(bdev_io);
2370 	return 0;
2371 }
2372 
2373 int
2374 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2375 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len,
2376 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
2377 {
2378 	struct spdk_bdev *bdev = desc->bdev;
2379 	struct spdk_bdev_io *bdev_io;
2380 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2381 
2382 	if (!desc->write) {
2383 		/*
2384 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
2385 		 *  to easily determine if the command is a read or write, but for now just
2386 		 *  do not allow io_passthru with a read-only descriptor.
2387 		 */
2388 		return -EBADF;
2389 	}
2390 
2391 	bdev_io = spdk_bdev_get_io(channel);
2392 	if (!bdev_io) {
2393 		return -ENOMEM;
2394 	}
2395 
2396 	bdev_io->internal.ch = channel;
2397 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD;
2398 	bdev_io->u.nvme_passthru.cmd = *cmd;
2399 	bdev_io->u.nvme_passthru.buf = buf;
2400 	bdev_io->u.nvme_passthru.nbytes = nbytes;
2401 	bdev_io->u.nvme_passthru.md_buf = md_buf;
2402 	bdev_io->u.nvme_passthru.md_len = md_len;
2403 
2404 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2405 
2406 	spdk_bdev_io_submit(bdev_io);
2407 	return 0;
2408 }
2409 
2410 int
2411 spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
2412 			struct spdk_bdev_io_wait_entry *entry)
2413 {
2414 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2415 	struct spdk_bdev_mgmt_channel *mgmt_ch = channel->shared_resource->mgmt_ch;
2416 
2417 	if (bdev != entry->bdev) {
2418 		SPDK_ERRLOG("bdevs do not match\n");
2419 		return -EINVAL;
2420 	}
2421 
2422 	if (mgmt_ch->per_thread_cache_count > 0) {
2423 		SPDK_ERRLOG("Cannot queue io_wait if spdk_bdev_io available in per-thread cache\n");
2424 		return -EINVAL;
2425 	}
2426 
2427 	TAILQ_INSERT_TAIL(&mgmt_ch->io_wait_queue, entry, link);
2428 	return 0;
2429 }
2430 
2431 static void
2432 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch)
2433 {
2434 	struct spdk_bdev *bdev = bdev_ch->bdev;
2435 	struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource;
2436 	struct spdk_bdev_io *bdev_io;
2437 
2438 	if (shared_resource->io_outstanding > shared_resource->nomem_threshold) {
2439 		/*
2440 		 * Allow some more I/O to complete before retrying the nomem_io queue.
2441 		 *  Some drivers (such as nvme) cannot immediately take a new I/O in
2442 		 *  the context of a completion, because the resources for the I/O are
2443 		 *  not released until control returns to the bdev poller.  Also, we
2444 		 *  may require several small I/O to complete before a larger I/O
2445 		 *  (that requires splitting) can be submitted.
2446 		 */
2447 		return;
2448 	}
2449 
2450 	while (!TAILQ_EMPTY(&shared_resource->nomem_io)) {
2451 		bdev_io = TAILQ_FIRST(&shared_resource->nomem_io);
2452 		TAILQ_REMOVE(&shared_resource->nomem_io, bdev_io, internal.link);
2453 		bdev_io->internal.ch->io_outstanding++;
2454 		shared_resource->io_outstanding++;
2455 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING;
2456 		bdev->fn_table->submit_request(bdev_io->internal.ch->channel, bdev_io);
2457 		if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) {
2458 			break;
2459 		}
2460 	}
2461 }
2462 
2463 static inline void
2464 _spdk_bdev_io_complete(void *ctx)
2465 {
2466 	struct spdk_bdev_io *bdev_io = ctx;
2467 
2468 	if (spdk_unlikely(bdev_io->internal.in_submit_request || bdev_io->internal.io_submit_ch)) {
2469 		/*
2470 		 * Send the completion to the thread that originally submitted the I/O,
2471 		 * which may not be the current thread in the case of QoS.
2472 		 */
2473 		if (bdev_io->internal.io_submit_ch) {
2474 			bdev_io->internal.ch = bdev_io->internal.io_submit_ch;
2475 			bdev_io->internal.io_submit_ch = NULL;
2476 		}
2477 
2478 		/*
2479 		 * Defer completion to avoid potential infinite recursion if the
2480 		 * user's completion callback issues a new I/O.
2481 		 */
2482 		spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel),
2483 				     _spdk_bdev_io_complete, bdev_io);
2484 		return;
2485 	}
2486 
2487 	if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) {
2488 		switch (bdev_io->type) {
2489 		case SPDK_BDEV_IO_TYPE_READ:
2490 			bdev_io->internal.ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
2491 			bdev_io->internal.ch->stat.num_read_ops++;
2492 			bdev_io->internal.ch->stat.read_latency_ticks += (spdk_get_ticks() - bdev_io->internal.submit_tsc);
2493 			break;
2494 		case SPDK_BDEV_IO_TYPE_WRITE:
2495 			bdev_io->internal.ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
2496 			bdev_io->internal.ch->stat.num_write_ops++;
2497 			bdev_io->internal.ch->stat.write_latency_ticks += (spdk_get_ticks() - bdev_io->internal.submit_tsc);
2498 			break;
2499 		default:
2500 			break;
2501 		}
2502 	}
2503 
2504 #ifdef SPDK_CONFIG_VTUNE
2505 	uint64_t now_tsc = spdk_get_ticks();
2506 	if (now_tsc > (bdev_io->internal.ch->start_tsc + bdev_io->internal.ch->interval_tsc)) {
2507 		uint64_t data[5];
2508 
2509 		data[0] = bdev_io->internal.ch->stat.num_read_ops - bdev_io->internal.ch->prev_stat.num_read_ops;
2510 		data[1] = bdev_io->internal.ch->stat.bytes_read - bdev_io->internal.ch->prev_stat.bytes_read;
2511 		data[2] = bdev_io->internal.ch->stat.num_write_ops - bdev_io->internal.ch->prev_stat.num_write_ops;
2512 		data[3] = bdev_io->internal.ch->stat.bytes_written - bdev_io->internal.ch->prev_stat.bytes_written;
2513 		data[4] = bdev_io->bdev->fn_table->get_spin_time ?
2514 			  bdev_io->bdev->fn_table->get_spin_time(bdev_io->internal.ch->channel) : 0;
2515 
2516 		__itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->internal.ch->handle,
2517 				   __itt_metadata_u64, 5, data);
2518 
2519 		bdev_io->internal.ch->prev_stat = bdev_io->internal.ch->stat;
2520 		bdev_io->internal.ch->start_tsc = now_tsc;
2521 	}
2522 #endif
2523 
2524 	assert(bdev_io->internal.cb != NULL);
2525 	assert(spdk_get_thread() == spdk_io_channel_get_thread(bdev_io->internal.ch->channel));
2526 
2527 	bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS,
2528 			     bdev_io->internal.caller_ctx);
2529 }
2530 
2531 static void
2532 _spdk_bdev_reset_complete(struct spdk_io_channel_iter *i, int status)
2533 {
2534 	struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i);
2535 
2536 	if (bdev_io->u.reset.ch_ref != NULL) {
2537 		spdk_put_io_channel(bdev_io->u.reset.ch_ref);
2538 		bdev_io->u.reset.ch_ref = NULL;
2539 	}
2540 
2541 	_spdk_bdev_io_complete(bdev_io);
2542 }
2543 
2544 static void
2545 _spdk_bdev_unfreeze_channel(struct spdk_io_channel_iter *i)
2546 {
2547 	struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i);
2548 	struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
2549 
2550 	ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS;
2551 	if (!TAILQ_EMPTY(&ch->queued_resets)) {
2552 		_spdk_bdev_channel_start_reset(ch);
2553 	}
2554 
2555 	spdk_for_each_channel_continue(i, 0);
2556 }
2557 
2558 void
2559 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
2560 {
2561 	struct spdk_bdev *bdev = bdev_io->bdev;
2562 	struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch;
2563 	struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource;
2564 
2565 	bdev_io->internal.status = status;
2566 
2567 	if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) {
2568 		bool unlock_channels = false;
2569 
2570 		if (status == SPDK_BDEV_IO_STATUS_NOMEM) {
2571 			SPDK_ERRLOG("NOMEM returned for reset\n");
2572 		}
2573 		pthread_mutex_lock(&bdev->internal.mutex);
2574 		if (bdev_io == bdev->internal.reset_in_progress) {
2575 			bdev->internal.reset_in_progress = NULL;
2576 			unlock_channels = true;
2577 		}
2578 		pthread_mutex_unlock(&bdev->internal.mutex);
2579 
2580 		if (unlock_channels) {
2581 			spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_unfreeze_channel,
2582 					      bdev_io, _spdk_bdev_reset_complete);
2583 			return;
2584 		}
2585 	} else {
2586 		assert(bdev_ch->io_outstanding > 0);
2587 		assert(shared_resource->io_outstanding > 0);
2588 		bdev_ch->io_outstanding--;
2589 		shared_resource->io_outstanding--;
2590 
2591 		if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) {
2592 			TAILQ_INSERT_HEAD(&shared_resource->nomem_io, bdev_io, internal.link);
2593 			/*
2594 			 * Wait for some of the outstanding I/O to complete before we
2595 			 *  retry any of the nomem_io.  Normally we will wait for
2596 			 *  NOMEM_THRESHOLD_COUNT I/O to complete but for low queue
2597 			 *  depth channels we will instead wait for half to complete.
2598 			 */
2599 			shared_resource->nomem_threshold = spdk_max((int64_t)shared_resource->io_outstanding / 2,
2600 							   (int64_t)shared_resource->io_outstanding - NOMEM_THRESHOLD_COUNT);
2601 			return;
2602 		}
2603 
2604 		if (spdk_unlikely(!TAILQ_EMPTY(&shared_resource->nomem_io))) {
2605 			_spdk_bdev_ch_retry_io(bdev_ch);
2606 		}
2607 	}
2608 
2609 	_spdk_bdev_io_complete(bdev_io);
2610 }
2611 
2612 void
2613 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc,
2614 				  enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq)
2615 {
2616 	if (sc == SPDK_SCSI_STATUS_GOOD) {
2617 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS;
2618 	} else {
2619 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SCSI_ERROR;
2620 		bdev_io->internal.error.scsi.sc = sc;
2621 		bdev_io->internal.error.scsi.sk = sk;
2622 		bdev_io->internal.error.scsi.asc = asc;
2623 		bdev_io->internal.error.scsi.ascq = ascq;
2624 	}
2625 
2626 	spdk_bdev_io_complete(bdev_io, bdev_io->internal.status);
2627 }
2628 
2629 void
2630 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io,
2631 			     int *sc, int *sk, int *asc, int *ascq)
2632 {
2633 	assert(sc != NULL);
2634 	assert(sk != NULL);
2635 	assert(asc != NULL);
2636 	assert(ascq != NULL);
2637 
2638 	switch (bdev_io->internal.status) {
2639 	case SPDK_BDEV_IO_STATUS_SUCCESS:
2640 		*sc = SPDK_SCSI_STATUS_GOOD;
2641 		*sk = SPDK_SCSI_SENSE_NO_SENSE;
2642 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
2643 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
2644 		break;
2645 	case SPDK_BDEV_IO_STATUS_NVME_ERROR:
2646 		spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq);
2647 		break;
2648 	case SPDK_BDEV_IO_STATUS_SCSI_ERROR:
2649 		*sc = bdev_io->internal.error.scsi.sc;
2650 		*sk = bdev_io->internal.error.scsi.sk;
2651 		*asc = bdev_io->internal.error.scsi.asc;
2652 		*ascq = bdev_io->internal.error.scsi.ascq;
2653 		break;
2654 	default:
2655 		*sc = SPDK_SCSI_STATUS_CHECK_CONDITION;
2656 		*sk = SPDK_SCSI_SENSE_ABORTED_COMMAND;
2657 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
2658 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
2659 		break;
2660 	}
2661 }
2662 
2663 void
2664 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc)
2665 {
2666 	if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) {
2667 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS;
2668 	} else {
2669 		bdev_io->internal.error.nvme.sct = sct;
2670 		bdev_io->internal.error.nvme.sc = sc;
2671 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NVME_ERROR;
2672 	}
2673 
2674 	spdk_bdev_io_complete(bdev_io, bdev_io->internal.status);
2675 }
2676 
2677 void
2678 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc)
2679 {
2680 	assert(sct != NULL);
2681 	assert(sc != NULL);
2682 
2683 	if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) {
2684 		*sct = bdev_io->internal.error.nvme.sct;
2685 		*sc = bdev_io->internal.error.nvme.sc;
2686 	} else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) {
2687 		*sct = SPDK_NVME_SCT_GENERIC;
2688 		*sc = SPDK_NVME_SC_SUCCESS;
2689 	} else {
2690 		*sct = SPDK_NVME_SCT_GENERIC;
2691 		*sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
2692 	}
2693 }
2694 
2695 struct spdk_thread *
2696 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io)
2697 {
2698 	return spdk_io_channel_get_thread(bdev_io->internal.ch->channel);
2699 }
2700 
2701 static void
2702 _spdk_bdev_qos_config_type(struct spdk_bdev *bdev, uint64_t qos_set,
2703 			   enum spdk_bdev_qos_type qos_type)
2704 {
2705 	uint64_t	min_qos_set = 0;
2706 
2707 	switch (qos_type) {
2708 	case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT:
2709 		min_qos_set = SPDK_BDEV_QOS_MIN_IOS_PER_SEC;
2710 		break;
2711 	case SPDK_BDEV_QOS_RW_BYTEPS_RATE_LIMIT:
2712 		min_qos_set = SPDK_BDEV_QOS_MIN_BW_IN_MB_PER_SEC;
2713 		break;
2714 	default:
2715 		SPDK_ERRLOG("Unsupported QoS type.\n");
2716 		return;
2717 	}
2718 
2719 	if (qos_set % min_qos_set) {
2720 		SPDK_ERRLOG("Assigned QoS %" PRIu64 " on bdev %s is not multiple of %lu\n",
2721 			    qos_set, bdev->name, min_qos_set);
2722 		SPDK_ERRLOG("Failed to enable QoS on this bdev %s\n", bdev->name);
2723 		return;
2724 	}
2725 
2726 	if (!bdev->internal.qos) {
2727 		bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos));
2728 		if (!bdev->internal.qos) {
2729 			SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n");
2730 			return;
2731 		}
2732 	}
2733 
2734 	switch (qos_type) {
2735 	case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT:
2736 		bdev->internal.qos->iops_rate_limit = qos_set;
2737 		break;
2738 	case SPDK_BDEV_QOS_RW_BYTEPS_RATE_LIMIT:
2739 		bdev->internal.qos->byte_rate_limit = qos_set * 1024 * 1024;
2740 		break;
2741 	default:
2742 		break;
2743 	}
2744 
2745 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev:%s QoS type:%d set:%lu\n",
2746 		      bdev->name, qos_type, qos_set);
2747 
2748 	return;
2749 }
2750 
2751 static void
2752 _spdk_bdev_qos_config(struct spdk_bdev *bdev)
2753 {
2754 	struct spdk_conf_section	*sp = NULL;
2755 	const char			*val = NULL;
2756 	uint64_t			qos_set = 0;
2757 	int				i = 0, j = 0;
2758 
2759 	sp = spdk_conf_find_section(NULL, "QoS");
2760 	if (!sp) {
2761 		return;
2762 	}
2763 
2764 	while (j < SPDK_BDEV_QOS_NUM_TYPES) {
2765 		i = 0;
2766 		while (true) {
2767 			val = spdk_conf_section_get_nmval(sp, qos_type_str[j], i, 0);
2768 			if (!val) {
2769 				break;
2770 			}
2771 
2772 			if (strcmp(bdev->name, val) != 0) {
2773 				i++;
2774 				continue;
2775 			}
2776 
2777 			val = spdk_conf_section_get_nmval(sp, qos_type_str[j], i, 1);
2778 			if (val) {
2779 				qos_set = strtoull(val, NULL, 10);
2780 				_spdk_bdev_qos_config_type(bdev, qos_set, j);
2781 			}
2782 
2783 			break;
2784 		}
2785 
2786 		j++;
2787 	}
2788 
2789 	return;
2790 }
2791 
2792 static int
2793 spdk_bdev_init(struct spdk_bdev *bdev)
2794 {
2795 	assert(bdev->module != NULL);
2796 
2797 	if (!bdev->name) {
2798 		SPDK_ERRLOG("Bdev name is NULL\n");
2799 		return -EINVAL;
2800 	}
2801 
2802 	if (spdk_bdev_get_by_name(bdev->name)) {
2803 		SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name);
2804 		return -EEXIST;
2805 	}
2806 
2807 	bdev->internal.status = SPDK_BDEV_STATUS_READY;
2808 	bdev->internal.measured_queue_depth = UINT64_MAX;
2809 
2810 	TAILQ_INIT(&bdev->internal.open_descs);
2811 
2812 	TAILQ_INIT(&bdev->aliases);
2813 
2814 	bdev->internal.reset_in_progress = NULL;
2815 
2816 	_spdk_bdev_qos_config(bdev);
2817 
2818 	spdk_io_device_register(__bdev_to_io_dev(bdev),
2819 				spdk_bdev_channel_create, spdk_bdev_channel_destroy,
2820 				sizeof(struct spdk_bdev_channel));
2821 
2822 	pthread_mutex_init(&bdev->internal.mutex, NULL);
2823 	return 0;
2824 }
2825 
2826 static void
2827 spdk_bdev_destroy_cb(void *io_device)
2828 {
2829 	int			rc;
2830 	struct spdk_bdev	*bdev;
2831 	spdk_bdev_unregister_cb	cb_fn;
2832 	void			*cb_arg;
2833 
2834 	bdev = __bdev_from_io_dev(io_device);
2835 	cb_fn = bdev->internal.unregister_cb;
2836 	cb_arg = bdev->internal.unregister_ctx;
2837 
2838 	rc = bdev->fn_table->destruct(bdev->ctxt);
2839 	if (rc < 0) {
2840 		SPDK_ERRLOG("destruct failed\n");
2841 	}
2842 	if (rc <= 0 && cb_fn != NULL) {
2843 		cb_fn(cb_arg, rc);
2844 	}
2845 }
2846 
2847 
2848 static void
2849 spdk_bdev_fini(struct spdk_bdev *bdev)
2850 {
2851 	pthread_mutex_destroy(&bdev->internal.mutex);
2852 
2853 	free(bdev->internal.qos);
2854 
2855 	spdk_io_device_unregister(__bdev_to_io_dev(bdev), spdk_bdev_destroy_cb);
2856 }
2857 
2858 static void
2859 spdk_bdev_start(struct spdk_bdev *bdev)
2860 {
2861 	struct spdk_bdev_module *module;
2862 	uint32_t action;
2863 
2864 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name);
2865 	TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, internal.link);
2866 
2867 	/* Examine configuration before initializing I/O */
2868 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) {
2869 		if (module->examine_config) {
2870 			action = module->internal.action_in_progress;
2871 			module->internal.action_in_progress++;
2872 			module->examine_config(bdev);
2873 			if (action != module->internal.action_in_progress) {
2874 				SPDK_ERRLOG("examine_config for module %s did not call spdk_bdev_module_examine_done()\n",
2875 					    module->name);
2876 			}
2877 		}
2878 	}
2879 
2880 	if (bdev->internal.claim_module) {
2881 		return;
2882 	}
2883 
2884 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) {
2885 		if (module->examine_disk) {
2886 			module->internal.action_in_progress++;
2887 			module->examine_disk(bdev);
2888 		}
2889 	}
2890 }
2891 
2892 int
2893 spdk_bdev_register(struct spdk_bdev *bdev)
2894 {
2895 	int rc = spdk_bdev_init(bdev);
2896 
2897 	if (rc == 0) {
2898 		spdk_bdev_start(bdev);
2899 	}
2900 
2901 	return rc;
2902 }
2903 
2904 static void
2905 spdk_vbdev_remove_base_bdevs(struct spdk_bdev *vbdev)
2906 {
2907 	struct spdk_bdev **bdevs;
2908 	struct spdk_bdev *base;
2909 	size_t i, j, k;
2910 	bool found;
2911 
2912 	/* Iterate over base bdevs to remove vbdev from them. */
2913 	for (i = 0; i < vbdev->internal.base_bdevs_cnt; i++) {
2914 		found = false;
2915 		base = vbdev->internal.base_bdevs[i];
2916 
2917 		for (j = 0; j < base->vbdevs_cnt; j++) {
2918 			if (base->vbdevs[j] != vbdev) {
2919 				continue;
2920 			}
2921 
2922 			for (k = j; k + 1 < base->vbdevs_cnt; k++) {
2923 				base->vbdevs[k] = base->vbdevs[k + 1];
2924 			}
2925 
2926 			base->vbdevs_cnt--;
2927 			if (base->vbdevs_cnt > 0) {
2928 				bdevs = realloc(base->vbdevs, base->vbdevs_cnt * sizeof(bdevs[0]));
2929 				/* It would be odd if shrinking memory block fail. */
2930 				assert(bdevs);
2931 				base->vbdevs = bdevs;
2932 			} else {
2933 				free(base->vbdevs);
2934 				base->vbdevs = NULL;
2935 			}
2936 
2937 			found = true;
2938 			break;
2939 		}
2940 
2941 		if (!found) {
2942 			SPDK_WARNLOG("Bdev '%s' is not base bdev of '%s'.\n", base->name, vbdev->name);
2943 		}
2944 	}
2945 
2946 	free(vbdev->internal.base_bdevs);
2947 	vbdev->internal.base_bdevs = NULL;
2948 	vbdev->internal.base_bdevs_cnt = 0;
2949 }
2950 
2951 static int
2952 spdk_vbdev_set_base_bdevs(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, size_t cnt)
2953 {
2954 	struct spdk_bdev **vbdevs;
2955 	struct spdk_bdev *base;
2956 	size_t i;
2957 
2958 	/* Adding base bdevs isn't supported (yet?). */
2959 	assert(vbdev->internal.base_bdevs_cnt == 0);
2960 
2961 	vbdev->internal.base_bdevs = malloc(cnt * sizeof(vbdev->internal.base_bdevs[0]));
2962 	if (!vbdev->internal.base_bdevs) {
2963 		SPDK_ERRLOG("%s - realloc() failed\n", vbdev->name);
2964 		return -ENOMEM;
2965 	}
2966 
2967 	memcpy(vbdev->internal.base_bdevs, base_bdevs, cnt * sizeof(vbdev->internal.base_bdevs[0]));
2968 	vbdev->internal.base_bdevs_cnt = cnt;
2969 
2970 	/* Iterate over base bdevs to add this vbdev to them. */
2971 	for (i = 0; i < cnt; i++) {
2972 		base = vbdev->internal.base_bdevs[i];
2973 
2974 		assert(base != NULL);
2975 		assert(base->internal.claim_module != NULL);
2976 
2977 		vbdevs = realloc(base->vbdevs, (base->vbdevs_cnt + 1) * sizeof(vbdevs[0]));
2978 		if (!vbdevs) {
2979 			SPDK_ERRLOG("%s - realloc() failed\n", base->name);
2980 			spdk_vbdev_remove_base_bdevs(vbdev);
2981 			return -ENOMEM;
2982 		}
2983 
2984 		vbdevs[base->vbdevs_cnt] = vbdev;
2985 		base->vbdevs = vbdevs;
2986 		base->vbdevs_cnt++;
2987 	}
2988 
2989 	return 0;
2990 }
2991 
2992 int
2993 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count)
2994 {
2995 	int rc;
2996 
2997 	rc = spdk_bdev_init(vbdev);
2998 	if (rc) {
2999 		return rc;
3000 	}
3001 
3002 	if (base_bdev_count == 0) {
3003 		spdk_bdev_start(vbdev);
3004 		return 0;
3005 	}
3006 
3007 	rc = spdk_vbdev_set_base_bdevs(vbdev, base_bdevs, base_bdev_count);
3008 	if (rc) {
3009 		spdk_bdev_fini(vbdev);
3010 		return rc;
3011 	}
3012 
3013 	spdk_bdev_start(vbdev);
3014 	return 0;
3015 
3016 }
3017 
3018 void
3019 spdk_bdev_destruct_done(struct spdk_bdev *bdev, int bdeverrno)
3020 {
3021 	if (bdev->internal.unregister_cb != NULL) {
3022 		bdev->internal.unregister_cb(bdev->internal.unregister_ctx, bdeverrno);
3023 	}
3024 }
3025 
3026 static void
3027 _remove_notify(void *arg)
3028 {
3029 	struct spdk_bdev_desc *desc = arg;
3030 
3031 	desc->remove_cb(desc->remove_ctx);
3032 }
3033 
3034 void
3035 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg)
3036 {
3037 	struct spdk_bdev_desc	*desc, *tmp;
3038 	bool			do_destruct = true;
3039 	struct spdk_thread	*thread;
3040 
3041 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name);
3042 
3043 	thread = spdk_get_thread();
3044 	if (!thread) {
3045 		/* The user called this from a non-SPDK thread. */
3046 		if (cb_fn != NULL) {
3047 			cb_fn(cb_arg, -ENOTSUP);
3048 		}
3049 		return;
3050 	}
3051 
3052 	pthread_mutex_lock(&bdev->internal.mutex);
3053 
3054 	spdk_vbdev_remove_base_bdevs(bdev);
3055 
3056 	bdev->internal.status = SPDK_BDEV_STATUS_REMOVING;
3057 	bdev->internal.unregister_cb = cb_fn;
3058 	bdev->internal.unregister_ctx = cb_arg;
3059 
3060 	TAILQ_FOREACH_SAFE(desc, &bdev->internal.open_descs, link, tmp) {
3061 		if (desc->remove_cb) {
3062 			do_destruct = false;
3063 			/*
3064 			 * Defer invocation of the remove_cb to a separate message that will
3065 			 *  run later on this thread.  This ensures this context unwinds and
3066 			 *  we don't recursively unregister this bdev again if the remove_cb
3067 			 *  immediately closes its descriptor.
3068 			 */
3069 			if (!desc->remove_scheduled) {
3070 				/* Avoid scheduling removal of the same descriptor multiple times. */
3071 				desc->remove_scheduled = true;
3072 				spdk_thread_send_msg(thread, _remove_notify, desc);
3073 			}
3074 		}
3075 	}
3076 
3077 	if (!do_destruct) {
3078 		pthread_mutex_unlock(&bdev->internal.mutex);
3079 		return;
3080 	}
3081 
3082 	TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link);
3083 	pthread_mutex_unlock(&bdev->internal.mutex);
3084 
3085 	spdk_bdev_fini(bdev);
3086 }
3087 
3088 int
3089 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb,
3090 	       void *remove_ctx, struct spdk_bdev_desc **_desc)
3091 {
3092 	struct spdk_bdev_desc *desc;
3093 
3094 	desc = calloc(1, sizeof(*desc));
3095 	if (desc == NULL) {
3096 		SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n");
3097 		return -ENOMEM;
3098 	}
3099 
3100 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Opening descriptor %p for bdev %s on thread %p\n", desc, bdev->name,
3101 		      spdk_get_thread());
3102 
3103 	pthread_mutex_lock(&bdev->internal.mutex);
3104 
3105 	if (write && bdev->internal.claim_module) {
3106 		SPDK_ERRLOG("Could not open %s - already claimed\n", bdev->name);
3107 		free(desc);
3108 		pthread_mutex_unlock(&bdev->internal.mutex);
3109 		return -EPERM;
3110 	}
3111 
3112 	TAILQ_INSERT_TAIL(&bdev->internal.open_descs, desc, link);
3113 
3114 	desc->bdev = bdev;
3115 	desc->remove_cb = remove_cb;
3116 	desc->remove_ctx = remove_ctx;
3117 	desc->write = write;
3118 	*_desc = desc;
3119 
3120 	pthread_mutex_unlock(&bdev->internal.mutex);
3121 
3122 	return 0;
3123 }
3124 
3125 void
3126 spdk_bdev_close(struct spdk_bdev_desc *desc)
3127 {
3128 	struct spdk_bdev *bdev = desc->bdev;
3129 	bool do_unregister = false;
3130 
3131 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closing descriptor %p for bdev %s on thread %p\n", desc, bdev->name,
3132 		      spdk_get_thread());
3133 
3134 	pthread_mutex_lock(&bdev->internal.mutex);
3135 
3136 	TAILQ_REMOVE(&bdev->internal.open_descs, desc, link);
3137 	free(desc);
3138 
3139 	/* If no more descriptors, kill QoS channel */
3140 	if (bdev->internal.qos && TAILQ_EMPTY(&bdev->internal.open_descs)) {
3141 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closed last descriptor for bdev %s on thread %p. Stopping QoS.\n",
3142 			      bdev->name, spdk_get_thread());
3143 
3144 		if (spdk_bdev_qos_destroy(bdev)) {
3145 			/* There isn't anything we can do to recover here. Just let the
3146 			 * old QoS poller keep running. The QoS handling won't change
3147 			 * cores when the user allocates a new channel, but it won't break. */
3148 			SPDK_ERRLOG("Unable to shut down QoS poller. It will continue running on the current thread.\n");
3149 		}
3150 	}
3151 
3152 	spdk_bdev_set_qd_sampling_period(bdev, 0);
3153 
3154 	if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->internal.open_descs)) {
3155 		do_unregister = true;
3156 	}
3157 	pthread_mutex_unlock(&bdev->internal.mutex);
3158 
3159 	if (do_unregister == true) {
3160 		spdk_bdev_unregister(bdev, bdev->internal.unregister_cb, bdev->internal.unregister_ctx);
3161 	}
3162 }
3163 
3164 int
3165 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc,
3166 			    struct spdk_bdev_module *module)
3167 {
3168 	if (bdev->internal.claim_module != NULL) {
3169 		SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name,
3170 			    bdev->internal.claim_module->name);
3171 		return -EPERM;
3172 	}
3173 
3174 	if (desc && !desc->write) {
3175 		desc->write = true;
3176 	}
3177 
3178 	bdev->internal.claim_module = module;
3179 	return 0;
3180 }
3181 
3182 void
3183 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev)
3184 {
3185 	assert(bdev->internal.claim_module != NULL);
3186 	bdev->internal.claim_module = NULL;
3187 }
3188 
3189 struct spdk_bdev *
3190 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc)
3191 {
3192 	return desc->bdev;
3193 }
3194 
3195 void
3196 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp)
3197 {
3198 	struct iovec *iovs;
3199 	int iovcnt;
3200 
3201 	if (bdev_io == NULL) {
3202 		return;
3203 	}
3204 
3205 	switch (bdev_io->type) {
3206 	case SPDK_BDEV_IO_TYPE_READ:
3207 		iovs = bdev_io->u.bdev.iovs;
3208 		iovcnt = bdev_io->u.bdev.iovcnt;
3209 		break;
3210 	case SPDK_BDEV_IO_TYPE_WRITE:
3211 		iovs = bdev_io->u.bdev.iovs;
3212 		iovcnt = bdev_io->u.bdev.iovcnt;
3213 		break;
3214 	default:
3215 		iovs = NULL;
3216 		iovcnt = 0;
3217 		break;
3218 	}
3219 
3220 	if (iovp) {
3221 		*iovp = iovs;
3222 	}
3223 	if (iovcntp) {
3224 		*iovcntp = iovcnt;
3225 	}
3226 }
3227 
3228 void
3229 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module)
3230 {
3231 
3232 	if (spdk_bdev_module_list_find(bdev_module->name)) {
3233 		SPDK_ERRLOG("ERROR: module '%s' already registered.\n", bdev_module->name);
3234 		assert(false);
3235 	}
3236 
3237 	if (bdev_module->async_init) {
3238 		bdev_module->internal.action_in_progress = 1;
3239 	}
3240 
3241 	/*
3242 	 * Modules with examine callbacks must be initialized first, so they are
3243 	 *  ready to handle examine callbacks from later modules that will
3244 	 *  register physical bdevs.
3245 	 */
3246 	if (bdev_module->examine_config != NULL || bdev_module->examine_disk != NULL) {
3247 		TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq);
3248 	} else {
3249 		TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq);
3250 	}
3251 }
3252 
3253 struct spdk_bdev_module *
3254 spdk_bdev_module_list_find(const char *name)
3255 {
3256 	struct spdk_bdev_module *bdev_module;
3257 
3258 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) {
3259 		if (strcmp(name, bdev_module->name) == 0) {
3260 			break;
3261 		}
3262 	}
3263 
3264 	return bdev_module;
3265 }
3266 
3267 static void
3268 spdk_bdev_write_zeroes_split(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
3269 {
3270 	uint64_t len;
3271 
3272 	if (!success) {
3273 		bdev_io->internal.cb = bdev_io->u.bdev.stored_user_cb;
3274 		_spdk_bdev_io_complete(bdev_io);
3275 		return;
3276 	}
3277 
3278 	/* no need to perform the error checking from write_zeroes_blocks because this request already passed those checks. */
3279 	len = spdk_min(spdk_bdev_get_block_size(bdev_io->bdev) * bdev_io->u.bdev.split_remaining_num_blocks,
3280 		       ZERO_BUFFER_SIZE);
3281 
3282 	bdev_io->u.bdev.offset_blocks = bdev_io->u.bdev.split_current_offset_blocks;
3283 	bdev_io->u.bdev.iovs[0].iov_len = len;
3284 	bdev_io->u.bdev.num_blocks = len / spdk_bdev_get_block_size(bdev_io->bdev);
3285 	bdev_io->u.bdev.split_remaining_num_blocks -= bdev_io->u.bdev.num_blocks;
3286 	bdev_io->u.bdev.split_current_offset_blocks += bdev_io->u.bdev.num_blocks;
3287 
3288 	/* if this round completes the i/o, change the callback to be the original user callback */
3289 	if (bdev_io->u.bdev.split_remaining_num_blocks == 0) {
3290 		spdk_bdev_io_init(bdev_io, bdev_io->bdev, cb_arg, bdev_io->u.bdev.stored_user_cb);
3291 	} else {
3292 		spdk_bdev_io_init(bdev_io, bdev_io->bdev, cb_arg, spdk_bdev_write_zeroes_split);
3293 	}
3294 	spdk_bdev_io_submit(bdev_io);
3295 }
3296 
3297 struct set_qos_limit_ctx {
3298 	void (*cb_fn)(void *cb_arg, int status);
3299 	void *cb_arg;
3300 	struct spdk_bdev *bdev;
3301 };
3302 
3303 static void
3304 _spdk_bdev_set_qos_limit_done(struct set_qos_limit_ctx *ctx, int status)
3305 {
3306 	pthread_mutex_lock(&ctx->bdev->internal.mutex);
3307 	ctx->bdev->internal.qos_mod_in_progress = false;
3308 	pthread_mutex_unlock(&ctx->bdev->internal.mutex);
3309 
3310 	ctx->cb_fn(ctx->cb_arg, status);
3311 	free(ctx);
3312 }
3313 
3314 static void
3315 _spdk_bdev_disable_qos_done(void *cb_arg)
3316 {
3317 	struct set_qos_limit_ctx *ctx = cb_arg;
3318 	struct spdk_bdev *bdev = ctx->bdev;
3319 	struct spdk_bdev_io *bdev_io;
3320 	struct spdk_bdev_qos *qos;
3321 
3322 	pthread_mutex_lock(&bdev->internal.mutex);
3323 	qos = bdev->internal.qos;
3324 	bdev->internal.qos = NULL;
3325 	pthread_mutex_unlock(&bdev->internal.mutex);
3326 
3327 	while (!TAILQ_EMPTY(&qos->queued)) {
3328 		/* Send queued I/O back to their original thread for resubmission. */
3329 		bdev_io = TAILQ_FIRST(&qos->queued);
3330 		TAILQ_REMOVE(&qos->queued, bdev_io, internal.link);
3331 
3332 		if (bdev_io->internal.io_submit_ch) {
3333 			/*
3334 			 * Channel was changed when sending it to the QoS thread - change it back
3335 			 *  before sending it back to the original thread.
3336 			 */
3337 			bdev_io->internal.ch = bdev_io->internal.io_submit_ch;
3338 			bdev_io->internal.io_submit_ch = NULL;
3339 		}
3340 
3341 		spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel),
3342 				     _spdk_bdev_io_submit, bdev_io);
3343 	}
3344 
3345 	spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch));
3346 	spdk_poller_unregister(&qos->poller);
3347 
3348 	free(qos);
3349 
3350 	_spdk_bdev_set_qos_limit_done(ctx, 0);
3351 }
3352 
3353 static void
3354 _spdk_bdev_disable_qos_msg_done(struct spdk_io_channel_iter *i, int status)
3355 {
3356 	void *io_device = spdk_io_channel_iter_get_io_device(i);
3357 	struct spdk_bdev *bdev = __bdev_from_io_dev(io_device);
3358 	struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
3359 	struct spdk_thread *thread;
3360 
3361 	pthread_mutex_lock(&bdev->internal.mutex);
3362 	thread = bdev->internal.qos->thread;
3363 	pthread_mutex_unlock(&bdev->internal.mutex);
3364 
3365 	spdk_thread_send_msg(thread, _spdk_bdev_disable_qos_done, ctx);
3366 }
3367 
3368 static void
3369 _spdk_bdev_disable_qos_msg(struct spdk_io_channel_iter *i)
3370 {
3371 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
3372 	struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch);
3373 
3374 	bdev_ch->flags &= ~BDEV_CH_QOS_ENABLED;
3375 
3376 	spdk_for_each_channel_continue(i, 0);
3377 }
3378 
3379 static void
3380 _spdk_bdev_update_qos_limit_iops_msg(void *cb_arg)
3381 {
3382 	struct set_qos_limit_ctx *ctx = cb_arg;
3383 	struct spdk_bdev *bdev = ctx->bdev;
3384 
3385 	pthread_mutex_lock(&bdev->internal.mutex);
3386 	spdk_bdev_qos_update_max_quota_per_timeslice(bdev->internal.qos);
3387 	pthread_mutex_unlock(&bdev->internal.mutex);
3388 
3389 	_spdk_bdev_set_qos_limit_done(ctx, 0);
3390 }
3391 
3392 static void
3393 _spdk_bdev_enable_qos_msg(struct spdk_io_channel_iter *i)
3394 {
3395 	void *io_device = spdk_io_channel_iter_get_io_device(i);
3396 	struct spdk_bdev *bdev = __bdev_from_io_dev(io_device);
3397 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
3398 	struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch);
3399 	int rc;
3400 
3401 	pthread_mutex_lock(&bdev->internal.mutex);
3402 	rc = _spdk_bdev_enable_qos(bdev, bdev_ch);
3403 	pthread_mutex_unlock(&bdev->internal.mutex);
3404 	spdk_for_each_channel_continue(i, rc);
3405 }
3406 
3407 static void
3408 _spdk_bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status)
3409 {
3410 	struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
3411 
3412 	_spdk_bdev_set_qos_limit_done(ctx, status);
3413 }
3414 
3415 void
3416 spdk_bdev_set_qos_limit_iops(struct spdk_bdev *bdev, uint64_t ios_per_sec,
3417 			     void (*cb_fn)(void *cb_arg, int status), void *cb_arg)
3418 {
3419 	struct set_qos_limit_ctx *ctx;
3420 
3421 	if (ios_per_sec > 0 && ios_per_sec % SPDK_BDEV_QOS_MIN_IOS_PER_SEC) {
3422 		SPDK_ERRLOG("Requested ios_per_sec limit %" PRIu64 " is not a multiple of %u\n",
3423 			    ios_per_sec, SPDK_BDEV_QOS_MIN_IOS_PER_SEC);
3424 		cb_fn(cb_arg, -EINVAL);
3425 		return;
3426 	}
3427 
3428 	ctx = calloc(1, sizeof(*ctx));
3429 	if (ctx == NULL) {
3430 		cb_fn(cb_arg, -ENOMEM);
3431 		return;
3432 	}
3433 
3434 	ctx->cb_fn = cb_fn;
3435 	ctx->cb_arg = cb_arg;
3436 	ctx->bdev = bdev;
3437 
3438 	pthread_mutex_lock(&bdev->internal.mutex);
3439 	if (bdev->internal.qos_mod_in_progress) {
3440 		pthread_mutex_unlock(&bdev->internal.mutex);
3441 		free(ctx);
3442 		cb_fn(cb_arg, -EAGAIN);
3443 		return;
3444 	}
3445 	bdev->internal.qos_mod_in_progress = true;
3446 
3447 	if (ios_per_sec > 0) {
3448 		if (bdev->internal.qos == NULL) {
3449 			/* Enabling */
3450 			bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos));
3451 			if (!bdev->internal.qos) {
3452 				pthread_mutex_unlock(&bdev->internal.mutex);
3453 				SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n");
3454 				free(ctx);
3455 				cb_fn(cb_arg, -ENOMEM);
3456 				return;
3457 			}
3458 
3459 			bdev->internal.qos->iops_rate_limit = ios_per_sec;
3460 			spdk_for_each_channel(__bdev_to_io_dev(bdev),
3461 					      _spdk_bdev_enable_qos_msg, ctx,
3462 					      _spdk_bdev_enable_qos_done);
3463 		} else {
3464 			/* Updating */
3465 			bdev->internal.qos->iops_rate_limit = ios_per_sec;
3466 			spdk_thread_send_msg(bdev->internal.qos->thread, _spdk_bdev_update_qos_limit_iops_msg, ctx);
3467 		}
3468 	} else {
3469 		if (bdev->internal.qos != NULL) {
3470 			/* Disabling */
3471 			spdk_for_each_channel(__bdev_to_io_dev(bdev),
3472 					      _spdk_bdev_disable_qos_msg, ctx,
3473 					      _spdk_bdev_disable_qos_msg_done);
3474 		} else {
3475 			pthread_mutex_unlock(&bdev->internal.mutex);
3476 			_spdk_bdev_set_qos_limit_done(ctx, 0);
3477 			return;
3478 		}
3479 	}
3480 
3481 	pthread_mutex_unlock(&bdev->internal.mutex);
3482 }
3483 
3484 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV)
3485