xref: /spdk/lib/bdev/bdev.c (revision 2fac05e919e1940137e4502f01beabb81ebbef9c)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (C) 2008-2012 Daisuke Aoyama <aoyama@peach.ne.jp>.
5  *   Copyright (c) Intel Corporation.
6  *   All rights reserved.
7  *
8  *   Redistribution and use in source and binary forms, with or without
9  *   modification, are permitted provided that the following conditions
10  *   are met:
11  *
12  *     * Redistributions of source code must retain the above copyright
13  *       notice, this list of conditions and the following disclaimer.
14  *     * Redistributions in binary form must reproduce the above copyright
15  *       notice, this list of conditions and the following disclaimer in
16  *       the documentation and/or other materials provided with the
17  *       distribution.
18  *     * Neither the name of Intel Corporation nor the names of its
19  *       contributors may be used to endorse or promote products derived
20  *       from this software without specific prior written permission.
21  *
22  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "spdk/stdinc.h"
36 
37 #include "spdk/bdev.h"
38 #include "spdk/conf.h"
39 
40 #include "spdk/env.h"
41 #include "spdk/event.h"
42 #include "spdk/thread.h"
43 #include "spdk/likely.h"
44 #include "spdk/queue.h"
45 #include "spdk/nvme_spec.h"
46 #include "spdk/scsi_spec.h"
47 #include "spdk/util.h"
48 
49 #include "spdk/bdev_module.h"
50 #include "spdk_internal/log.h"
51 #include "spdk/string.h"
52 
53 #ifdef SPDK_CONFIG_VTUNE
54 #include "ittnotify.h"
55 #include "ittnotify_types.h"
56 int __itt_init_ittlib(const char *, __itt_group_id);
57 #endif
58 
59 #define SPDK_BDEV_IO_POOL_SIZE			(64 * 1024)
60 #define SPDK_BDEV_IO_CACHE_SIZE			256
61 #define BUF_SMALL_POOL_SIZE			8192
62 #define BUF_LARGE_POOL_SIZE			1024
63 #define NOMEM_THRESHOLD_COUNT			8
64 #define ZERO_BUFFER_SIZE			0x100000
65 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC		1000
66 #define SPDK_BDEV_SEC_TO_USEC			1000000ULL
67 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE	1
68 #define SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE	512
69 #define SPDK_BDEV_QOS_MIN_IOS_PER_SEC		10000
70 #define SPDK_BDEV_QOS_MIN_BW_IN_MB_PER_SEC	10
71 
72 enum spdk_bdev_qos_type {
73 	SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT = 0,
74 	SPDK_BDEV_QOS_RW_BYTEPS_RATE_LIMIT,
75 	SPDK_BDEV_QOS_NUM_TYPES /* Keep last */
76 };
77 
78 static const char *qos_type_str[SPDK_BDEV_QOS_NUM_TYPES] = {"Limit_IOPS", "Limit_BWPS"};
79 
80 struct spdk_bdev_mgr {
81 	struct spdk_mempool *bdev_io_pool;
82 
83 	struct spdk_mempool *buf_small_pool;
84 	struct spdk_mempool *buf_large_pool;
85 
86 	void *zero_buffer;
87 
88 	TAILQ_HEAD(, spdk_bdev_module) bdev_modules;
89 
90 	TAILQ_HEAD(, spdk_bdev) bdevs;
91 
92 	bool init_complete;
93 	bool module_init_complete;
94 
95 #ifdef SPDK_CONFIG_VTUNE
96 	__itt_domain	*domain;
97 #endif
98 };
99 
100 static struct spdk_bdev_mgr g_bdev_mgr = {
101 	.bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules),
102 	.bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs),
103 	.init_complete = false,
104 	.module_init_complete = false,
105 };
106 
107 static struct spdk_bdev_opts	g_bdev_opts = {
108 	.bdev_io_pool_size = SPDK_BDEV_IO_POOL_SIZE,
109 	.bdev_io_cache_size = SPDK_BDEV_IO_CACHE_SIZE,
110 };
111 
112 static spdk_bdev_init_cb	g_init_cb_fn = NULL;
113 static void			*g_init_cb_arg = NULL;
114 
115 static spdk_bdev_fini_cb	g_fini_cb_fn = NULL;
116 static void			*g_fini_cb_arg = NULL;
117 static struct spdk_thread	*g_fini_thread = NULL;
118 
119 struct spdk_bdev_qos {
120 	/** Rate limit, in I/O per second */
121 	uint64_t iops_rate_limit;
122 
123 	/** Rate limit, in byte per second */
124 	uint64_t byte_rate_limit;
125 
126 	/** The channel that all I/O are funneled through */
127 	struct spdk_bdev_channel *ch;
128 
129 	/** The thread on which the poller is running. */
130 	struct spdk_thread *thread;
131 
132 	/** Queue of I/O waiting to be issued. */
133 	bdev_io_tailq_t queued;
134 
135 	/** Maximum allowed IOs to be issued in one timeslice (e.g., 1ms) and
136 	 *  only valid for the master channel which manages the outstanding IOs. */
137 	uint64_t max_ios_per_timeslice;
138 
139 	/** Maximum allowed bytes to be issued in one timeslice (e.g., 1ms) and
140 	 *  only valid for the master channel which manages the outstanding IOs. */
141 	uint64_t max_byte_per_timeslice;
142 
143 	/** Submitted IO in one timeslice (e.g., 1ms) */
144 	uint64_t io_submitted_this_timeslice;
145 
146 	/** Submitted byte in one timeslice (e.g., 1ms) */
147 	uint64_t byte_submitted_this_timeslice;
148 
149 	/** Polller that processes queued I/O commands each time slice. */
150 	struct spdk_poller *poller;
151 };
152 
153 struct spdk_bdev_mgmt_channel {
154 	bdev_io_stailq_t need_buf_small;
155 	bdev_io_stailq_t need_buf_large;
156 
157 	/*
158 	 * Each thread keeps a cache of bdev_io - this allows
159 	 *  bdev threads which are *not* DPDK threads to still
160 	 *  benefit from a per-thread bdev_io cache.  Without
161 	 *  this, non-DPDK threads fetching from the mempool
162 	 *  incur a cmpxchg on get and put.
163 	 */
164 	bdev_io_stailq_t per_thread_cache;
165 	uint32_t	per_thread_cache_count;
166 	uint32_t	bdev_io_cache_size;
167 
168 	TAILQ_HEAD(, spdk_bdev_shared_resource)	shared_resources;
169 	TAILQ_HEAD(, spdk_bdev_io_wait_entry)	io_wait_queue;
170 };
171 
172 /*
173  * Per-module (or per-io_device) data. Multiple bdevs built on the same io_device
174  * will queue here their IO that awaits retry. It makes it posible to retry sending
175  * IO to one bdev after IO from other bdev completes.
176  */
177 struct spdk_bdev_shared_resource {
178 	/* The bdev management channel */
179 	struct spdk_bdev_mgmt_channel *mgmt_ch;
180 
181 	/*
182 	 * Count of I/O submitted to bdev module and waiting for completion.
183 	 * Incremented before submit_request() is called on an spdk_bdev_io.
184 	 */
185 	uint64_t		io_outstanding;
186 
187 	/*
188 	 * Queue of IO awaiting retry because of a previous NOMEM status returned
189 	 *  on this channel.
190 	 */
191 	bdev_io_tailq_t		nomem_io;
192 
193 	/*
194 	 * Threshold which io_outstanding must drop to before retrying nomem_io.
195 	 */
196 	uint64_t		nomem_threshold;
197 
198 	/* I/O channel allocated by a bdev module */
199 	struct spdk_io_channel	*shared_ch;
200 
201 	/* Refcount of bdev channels using this resource */
202 	uint32_t		ref;
203 
204 	TAILQ_ENTRY(spdk_bdev_shared_resource) link;
205 };
206 
207 #define BDEV_CH_RESET_IN_PROGRESS	(1 << 0)
208 #define BDEV_CH_QOS_ENABLED		(1 << 1)
209 
210 struct spdk_bdev_channel {
211 	struct spdk_bdev	*bdev;
212 
213 	/* The channel for the underlying device */
214 	struct spdk_io_channel	*channel;
215 
216 	/* Per io_device per thread data */
217 	struct spdk_bdev_shared_resource *shared_resource;
218 
219 	struct spdk_bdev_io_stat stat;
220 
221 	/*
222 	 * Count of I/O submitted through this channel and waiting for completion.
223 	 * Incremented before submit_request() is called on an spdk_bdev_io.
224 	 */
225 	uint64_t		io_outstanding;
226 
227 	bdev_io_tailq_t		queued_resets;
228 
229 	uint32_t		flags;
230 
231 #ifdef SPDK_CONFIG_VTUNE
232 	uint64_t		start_tsc;
233 	uint64_t		interval_tsc;
234 	__itt_string_handle	*handle;
235 	struct spdk_bdev_io_stat prev_stat;
236 #endif
237 
238 };
239 
240 struct spdk_bdev_desc {
241 	struct spdk_bdev		*bdev;
242 	spdk_bdev_remove_cb_t		remove_cb;
243 	void				*remove_ctx;
244 	bool				remove_scheduled;
245 	bool				write;
246 	TAILQ_ENTRY(spdk_bdev_desc)	link;
247 };
248 
249 struct spdk_bdev_iostat_ctx {
250 	struct spdk_bdev_io_stat *stat;
251 	spdk_bdev_get_device_stat_cb cb;
252 	void *cb_arg;
253 };
254 
255 #define __bdev_to_io_dev(bdev)		(((char *)bdev) + 1)
256 #define __bdev_from_io_dev(io_dev)	((struct spdk_bdev *)(((char *)io_dev) - 1))
257 
258 static void spdk_bdev_write_zeroes_split(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg);
259 
260 void
261 spdk_bdev_get_opts(struct spdk_bdev_opts *opts)
262 {
263 	*opts = g_bdev_opts;
264 }
265 
266 int
267 spdk_bdev_set_opts(struct spdk_bdev_opts *opts)
268 {
269 	uint32_t min_pool_size;
270 
271 	/*
272 	 * Add 1 to the thread count to account for the extra mgmt_ch that gets created during subsystem
273 	 *  initialization.  A second mgmt_ch will be created on the same thread when the application starts
274 	 *  but before the deferred put_io_channel event is executed for the first mgmt_ch.
275 	 */
276 	min_pool_size = opts->bdev_io_cache_size * (spdk_thread_get_count() + 1);
277 	if (opts->bdev_io_pool_size < min_pool_size) {
278 		SPDK_ERRLOG("bdev_io_pool_size %" PRIu32 " is not compatible with bdev_io_cache_size %" PRIu32
279 			    " and %" PRIu32 " threads\n", opts->bdev_io_pool_size, opts->bdev_io_cache_size,
280 			    spdk_thread_get_count());
281 		SPDK_ERRLOG("bdev_io_pool_size must be at least %" PRIu32 "\n", min_pool_size);
282 		return -1;
283 	}
284 
285 	g_bdev_opts = *opts;
286 	return 0;
287 }
288 
289 struct spdk_bdev *
290 spdk_bdev_first(void)
291 {
292 	struct spdk_bdev *bdev;
293 
294 	bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs);
295 	if (bdev) {
296 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name);
297 	}
298 
299 	return bdev;
300 }
301 
302 struct spdk_bdev *
303 spdk_bdev_next(struct spdk_bdev *prev)
304 {
305 	struct spdk_bdev *bdev;
306 
307 	bdev = TAILQ_NEXT(prev, internal.link);
308 	if (bdev) {
309 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
310 	}
311 
312 	return bdev;
313 }
314 
315 static struct spdk_bdev *
316 _bdev_next_leaf(struct spdk_bdev *bdev)
317 {
318 	while (bdev != NULL) {
319 		if (bdev->internal.claim_module == NULL) {
320 			return bdev;
321 		} else {
322 			bdev = TAILQ_NEXT(bdev, internal.link);
323 		}
324 	}
325 
326 	return bdev;
327 }
328 
329 struct spdk_bdev *
330 spdk_bdev_first_leaf(void)
331 {
332 	struct spdk_bdev *bdev;
333 
334 	bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs));
335 
336 	if (bdev) {
337 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name);
338 	}
339 
340 	return bdev;
341 }
342 
343 struct spdk_bdev *
344 spdk_bdev_next_leaf(struct spdk_bdev *prev)
345 {
346 	struct spdk_bdev *bdev;
347 
348 	bdev = _bdev_next_leaf(TAILQ_NEXT(prev, internal.link));
349 
350 	if (bdev) {
351 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
352 	}
353 
354 	return bdev;
355 }
356 
357 struct spdk_bdev *
358 spdk_bdev_get_by_name(const char *bdev_name)
359 {
360 	struct spdk_bdev_alias *tmp;
361 	struct spdk_bdev *bdev = spdk_bdev_first();
362 
363 	while (bdev != NULL) {
364 		if (strcmp(bdev_name, bdev->name) == 0) {
365 			return bdev;
366 		}
367 
368 		TAILQ_FOREACH(tmp, &bdev->aliases, tailq) {
369 			if (strcmp(bdev_name, tmp->alias) == 0) {
370 				return bdev;
371 			}
372 		}
373 
374 		bdev = spdk_bdev_next(bdev);
375 	}
376 
377 	return NULL;
378 }
379 
380 static void
381 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf)
382 {
383 	assert(bdev_io->internal.get_buf_cb != NULL);
384 	assert(buf != NULL);
385 	assert(bdev_io->u.bdev.iovs != NULL);
386 
387 	bdev_io->internal.buf = buf;
388 	bdev_io->u.bdev.iovs[0].iov_base = (void *)((unsigned long)((char *)buf + 512) & ~511UL);
389 	bdev_io->u.bdev.iovs[0].iov_len = bdev_io->internal.buf_len;
390 	bdev_io->internal.get_buf_cb(bdev_io->internal.ch->channel, bdev_io);
391 }
392 
393 static void
394 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io)
395 {
396 	struct spdk_mempool *pool;
397 	struct spdk_bdev_io *tmp;
398 	void *buf;
399 	bdev_io_stailq_t *stailq;
400 	struct spdk_bdev_mgmt_channel *ch;
401 
402 	assert(bdev_io->u.bdev.iovcnt == 1);
403 
404 	buf = bdev_io->internal.buf;
405 	ch = bdev_io->internal.ch->shared_resource->mgmt_ch;
406 
407 	if (bdev_io->internal.buf_len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
408 		pool = g_bdev_mgr.buf_small_pool;
409 		stailq = &ch->need_buf_small;
410 	} else {
411 		pool = g_bdev_mgr.buf_large_pool;
412 		stailq = &ch->need_buf_large;
413 	}
414 
415 	if (STAILQ_EMPTY(stailq)) {
416 		spdk_mempool_put(pool, buf);
417 	} else {
418 		tmp = STAILQ_FIRST(stailq);
419 		STAILQ_REMOVE_HEAD(stailq, internal.buf_link);
420 		spdk_bdev_io_set_buf(tmp, buf);
421 	}
422 }
423 
424 void
425 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len)
426 {
427 	struct spdk_mempool *pool;
428 	bdev_io_stailq_t *stailq;
429 	void *buf = NULL;
430 	struct spdk_bdev_mgmt_channel *mgmt_ch;
431 
432 	assert(cb != NULL);
433 	assert(bdev_io->u.bdev.iovs != NULL);
434 
435 	if (spdk_unlikely(bdev_io->u.bdev.iovs[0].iov_base != NULL)) {
436 		/* Buffer already present */
437 		cb(bdev_io->internal.ch->channel, bdev_io);
438 		return;
439 	}
440 
441 	assert(len <= SPDK_BDEV_LARGE_BUF_MAX_SIZE);
442 	mgmt_ch = bdev_io->internal.ch->shared_resource->mgmt_ch;
443 
444 	bdev_io->internal.buf_len = len;
445 	bdev_io->internal.get_buf_cb = cb;
446 	if (len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
447 		pool = g_bdev_mgr.buf_small_pool;
448 		stailq = &mgmt_ch->need_buf_small;
449 	} else {
450 		pool = g_bdev_mgr.buf_large_pool;
451 		stailq = &mgmt_ch->need_buf_large;
452 	}
453 
454 	buf = spdk_mempool_get(pool);
455 
456 	if (!buf) {
457 		STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link);
458 	} else {
459 		spdk_bdev_io_set_buf(bdev_io, buf);
460 	}
461 }
462 
463 static int
464 spdk_bdev_module_get_max_ctx_size(void)
465 {
466 	struct spdk_bdev_module *bdev_module;
467 	int max_bdev_module_size = 0;
468 
469 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) {
470 		if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) {
471 			max_bdev_module_size = bdev_module->get_ctx_size();
472 		}
473 	}
474 
475 	return max_bdev_module_size;
476 }
477 
478 void
479 spdk_bdev_config_text(FILE *fp)
480 {
481 	struct spdk_bdev_module *bdev_module;
482 
483 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) {
484 		if (bdev_module->config_text) {
485 			bdev_module->config_text(fp);
486 		}
487 	}
488 }
489 
490 void
491 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w)
492 {
493 	struct spdk_bdev_module *bdev_module;
494 	struct spdk_bdev *bdev;
495 
496 	assert(w != NULL);
497 
498 	spdk_json_write_array_begin(w);
499 
500 	spdk_json_write_object_begin(w);
501 	spdk_json_write_named_string(w, "method", "set_bdev_options");
502 	spdk_json_write_name(w, "params");
503 	spdk_json_write_object_begin(w);
504 	spdk_json_write_named_uint32(w, "bdev_io_pool_size", g_bdev_opts.bdev_io_pool_size);
505 	spdk_json_write_named_uint32(w, "bdev_io_cache_size", g_bdev_opts.bdev_io_cache_size);
506 	spdk_json_write_object_end(w);
507 	spdk_json_write_object_end(w);
508 
509 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) {
510 		if (bdev_module->config_json) {
511 			bdev_module->config_json(w);
512 		}
513 	}
514 
515 	TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, internal.link) {
516 		spdk_bdev_config_json(bdev, w);
517 	}
518 
519 	spdk_json_write_array_end(w);
520 }
521 
522 static int
523 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf)
524 {
525 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
526 	struct spdk_bdev_io *bdev_io;
527 	uint32_t i;
528 
529 	STAILQ_INIT(&ch->need_buf_small);
530 	STAILQ_INIT(&ch->need_buf_large);
531 
532 	STAILQ_INIT(&ch->per_thread_cache);
533 	ch->bdev_io_cache_size = g_bdev_opts.bdev_io_cache_size;
534 
535 	/* Pre-populate bdev_io cache to ensure this thread cannot be starved. */
536 	ch->per_thread_cache_count = 0;
537 	for (i = 0; i < ch->bdev_io_cache_size; i++) {
538 		bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool);
539 		assert(bdev_io != NULL);
540 		ch->per_thread_cache_count++;
541 		STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link);
542 	}
543 
544 	TAILQ_INIT(&ch->shared_resources);
545 	TAILQ_INIT(&ch->io_wait_queue);
546 
547 	return 0;
548 }
549 
550 static void
551 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf)
552 {
553 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
554 	struct spdk_bdev_io *bdev_io;
555 
556 	if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) {
557 		SPDK_ERRLOG("Pending I/O list wasn't empty on mgmt channel free\n");
558 	}
559 
560 	if (!TAILQ_EMPTY(&ch->shared_resources)) {
561 		SPDK_ERRLOG("Module channel list wasn't empty on mgmt channel free\n");
562 	}
563 
564 	while (!STAILQ_EMPTY(&ch->per_thread_cache)) {
565 		bdev_io = STAILQ_FIRST(&ch->per_thread_cache);
566 		STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link);
567 		ch->per_thread_cache_count--;
568 		spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
569 	}
570 
571 	assert(ch->per_thread_cache_count == 0);
572 }
573 
574 static void
575 spdk_bdev_init_complete(int rc)
576 {
577 	spdk_bdev_init_cb cb_fn = g_init_cb_fn;
578 	void *cb_arg = g_init_cb_arg;
579 	struct spdk_bdev_module *m;
580 
581 	g_bdev_mgr.init_complete = true;
582 	g_init_cb_fn = NULL;
583 	g_init_cb_arg = NULL;
584 
585 	/*
586 	 * For modules that need to know when subsystem init is complete,
587 	 * inform them now.
588 	 */
589 	TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) {
590 		if (m->init_complete) {
591 			m->init_complete();
592 		}
593 	}
594 
595 	cb_fn(cb_arg, rc);
596 }
597 
598 static void
599 spdk_bdev_module_action_complete(void)
600 {
601 	struct spdk_bdev_module *m;
602 
603 	/*
604 	 * Don't finish bdev subsystem initialization if
605 	 * module pre-initialization is still in progress, or
606 	 * the subsystem been already initialized.
607 	 */
608 	if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) {
609 		return;
610 	}
611 
612 	/*
613 	 * Check all bdev modules for inits/examinations in progress. If any
614 	 * exist, return immediately since we cannot finish bdev subsystem
615 	 * initialization until all are completed.
616 	 */
617 	TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) {
618 		if (m->internal.action_in_progress > 0) {
619 			return;
620 		}
621 	}
622 
623 	/*
624 	 * Modules already finished initialization - now that all
625 	 * the bdev modules have finished their asynchronous I/O
626 	 * processing, the entire bdev layer can be marked as complete.
627 	 */
628 	spdk_bdev_init_complete(0);
629 }
630 
631 static void
632 spdk_bdev_module_action_done(struct spdk_bdev_module *module)
633 {
634 	assert(module->internal.action_in_progress > 0);
635 	module->internal.action_in_progress--;
636 	spdk_bdev_module_action_complete();
637 }
638 
639 void
640 spdk_bdev_module_init_done(struct spdk_bdev_module *module)
641 {
642 	spdk_bdev_module_action_done(module);
643 }
644 
645 void
646 spdk_bdev_module_examine_done(struct spdk_bdev_module *module)
647 {
648 	spdk_bdev_module_action_done(module);
649 }
650 
651 static int
652 spdk_bdev_modules_init(void)
653 {
654 	struct spdk_bdev_module *module;
655 	int rc = 0;
656 
657 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) {
658 		rc = module->module_init();
659 		if (rc != 0) {
660 			break;
661 		}
662 	}
663 
664 	g_bdev_mgr.module_init_complete = true;
665 	return rc;
666 }
667 
668 void
669 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg)
670 {
671 	struct spdk_conf_section *sp;
672 	struct spdk_bdev_opts bdev_opts;
673 	int32_t bdev_io_pool_size, bdev_io_cache_size;
674 	int cache_size;
675 	int rc = 0;
676 	char mempool_name[32];
677 
678 	assert(cb_fn != NULL);
679 
680 	sp = spdk_conf_find_section(NULL, "Bdev");
681 	if (sp != NULL) {
682 		spdk_bdev_get_opts(&bdev_opts);
683 
684 		bdev_io_pool_size = spdk_conf_section_get_intval(sp, "BdevIoPoolSize");
685 		if (bdev_io_pool_size >= 0) {
686 			bdev_opts.bdev_io_pool_size = bdev_io_pool_size;
687 		}
688 
689 		bdev_io_cache_size = spdk_conf_section_get_intval(sp, "BdevIoCacheSize");
690 		if (bdev_io_cache_size >= 0) {
691 			bdev_opts.bdev_io_cache_size = bdev_io_cache_size;
692 		}
693 
694 		if (spdk_bdev_set_opts(&bdev_opts)) {
695 			spdk_bdev_init_complete(-1);
696 			return;
697 		}
698 
699 		assert(memcmp(&bdev_opts, &g_bdev_opts, sizeof(bdev_opts)) == 0);
700 	}
701 
702 	g_init_cb_fn = cb_fn;
703 	g_init_cb_arg = cb_arg;
704 
705 	snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid());
706 
707 	g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name,
708 				  g_bdev_opts.bdev_io_pool_size,
709 				  sizeof(struct spdk_bdev_io) +
710 				  spdk_bdev_module_get_max_ctx_size(),
711 				  0,
712 				  SPDK_ENV_SOCKET_ID_ANY);
713 
714 	if (g_bdev_mgr.bdev_io_pool == NULL) {
715 		SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n");
716 		spdk_bdev_init_complete(-1);
717 		return;
718 	}
719 
720 	/**
721 	 * Ensure no more than half of the total buffers end up local caches, by
722 	 *   using spdk_thread_get_count() to determine how many local caches we need
723 	 *   to account for.
724 	 */
725 	cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_thread_get_count());
726 	snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid());
727 
728 	g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name,
729 				    BUF_SMALL_POOL_SIZE,
730 				    SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512,
731 				    cache_size,
732 				    SPDK_ENV_SOCKET_ID_ANY);
733 	if (!g_bdev_mgr.buf_small_pool) {
734 		SPDK_ERRLOG("create rbuf small pool failed\n");
735 		spdk_bdev_init_complete(-1);
736 		return;
737 	}
738 
739 	cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_thread_get_count());
740 	snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid());
741 
742 	g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name,
743 				    BUF_LARGE_POOL_SIZE,
744 				    SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512,
745 				    cache_size,
746 				    SPDK_ENV_SOCKET_ID_ANY);
747 	if (!g_bdev_mgr.buf_large_pool) {
748 		SPDK_ERRLOG("create rbuf large pool failed\n");
749 		spdk_bdev_init_complete(-1);
750 		return;
751 	}
752 
753 	g_bdev_mgr.zero_buffer = spdk_dma_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE,
754 				 NULL);
755 	if (!g_bdev_mgr.zero_buffer) {
756 		SPDK_ERRLOG("create bdev zero buffer failed\n");
757 		spdk_bdev_init_complete(-1);
758 		return;
759 	}
760 
761 #ifdef SPDK_CONFIG_VTUNE
762 	g_bdev_mgr.domain = __itt_domain_create("spdk_bdev");
763 #endif
764 
765 	spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create,
766 				spdk_bdev_mgmt_channel_destroy,
767 				sizeof(struct spdk_bdev_mgmt_channel));
768 
769 	rc = spdk_bdev_modules_init();
770 	if (rc != 0) {
771 		SPDK_ERRLOG("bdev modules init failed\n");
772 		spdk_bdev_init_complete(-1);
773 		return;
774 	}
775 
776 	spdk_bdev_module_action_complete();
777 }
778 
779 static void
780 spdk_bdev_mgr_unregister_cb(void *io_device)
781 {
782 	spdk_bdev_fini_cb cb_fn = g_fini_cb_fn;
783 
784 	if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != g_bdev_opts.bdev_io_pool_size) {
785 		SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n",
786 			    spdk_mempool_count(g_bdev_mgr.bdev_io_pool),
787 			    g_bdev_opts.bdev_io_pool_size);
788 	}
789 
790 	if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) {
791 		SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n",
792 			    spdk_mempool_count(g_bdev_mgr.buf_small_pool),
793 			    BUF_SMALL_POOL_SIZE);
794 		assert(false);
795 	}
796 
797 	if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) {
798 		SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n",
799 			    spdk_mempool_count(g_bdev_mgr.buf_large_pool),
800 			    BUF_LARGE_POOL_SIZE);
801 		assert(false);
802 	}
803 
804 	spdk_mempool_free(g_bdev_mgr.bdev_io_pool);
805 	spdk_mempool_free(g_bdev_mgr.buf_small_pool);
806 	spdk_mempool_free(g_bdev_mgr.buf_large_pool);
807 	spdk_dma_free(g_bdev_mgr.zero_buffer);
808 
809 	cb_fn(g_fini_cb_arg);
810 	g_fini_cb_fn = NULL;
811 	g_fini_cb_arg = NULL;
812 }
813 
814 static struct spdk_bdev_module *g_resume_bdev_module = NULL;
815 
816 static void
817 spdk_bdev_module_finish_iter(void *arg)
818 {
819 	struct spdk_bdev_module *bdev_module;
820 
821 	/* Start iterating from the last touched module */
822 	if (!g_resume_bdev_module) {
823 		bdev_module = TAILQ_FIRST(&g_bdev_mgr.bdev_modules);
824 	} else {
825 		bdev_module = TAILQ_NEXT(g_resume_bdev_module, internal.tailq);
826 	}
827 
828 	while (bdev_module) {
829 		if (bdev_module->async_fini) {
830 			/* Save our place so we can resume later. We must
831 			 * save the variable here, before calling module_fini()
832 			 * below, because in some cases the module may immediately
833 			 * call spdk_bdev_module_finish_done() and re-enter
834 			 * this function to continue iterating. */
835 			g_resume_bdev_module = bdev_module;
836 		}
837 
838 		if (bdev_module->module_fini) {
839 			bdev_module->module_fini();
840 		}
841 
842 		if (bdev_module->async_fini) {
843 			return;
844 		}
845 
846 		bdev_module = TAILQ_NEXT(bdev_module, internal.tailq);
847 	}
848 
849 	g_resume_bdev_module = NULL;
850 	spdk_io_device_unregister(&g_bdev_mgr, spdk_bdev_mgr_unregister_cb);
851 }
852 
853 void
854 spdk_bdev_module_finish_done(void)
855 {
856 	if (spdk_get_thread() != g_fini_thread) {
857 		spdk_thread_send_msg(g_fini_thread, spdk_bdev_module_finish_iter, NULL);
858 	} else {
859 		spdk_bdev_module_finish_iter(NULL);
860 	}
861 }
862 
863 static void
864 _spdk_bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno)
865 {
866 	struct spdk_bdev *bdev = cb_arg;
867 
868 	if (bdeverrno && bdev) {
869 		SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n",
870 			     bdev->name);
871 
872 		/*
873 		 * Since the call to spdk_bdev_unregister() failed, we have no way to free this
874 		 *  bdev; try to continue by manually removing this bdev from the list and continue
875 		 *  with the next bdev in the list.
876 		 */
877 		TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link);
878 	}
879 
880 	if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) {
881 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n");
882 		/*
883 		 * Bdev module finish need to be deffered as we might be in the middle of some context
884 		 * (like bdev part free) that will use this bdev (or private bdev driver ctx data)
885 		 * after returning.
886 		 */
887 		spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_module_finish_iter, NULL);
888 		return;
889 	}
890 
891 	/*
892 	 * Unregister the first bdev in the list.
893 	 *
894 	 * spdk_bdev_unregister() will handle the case where the bdev has open descriptors by
895 	 *  calling the remove_cb of the descriptors first.
896 	 *
897 	 * Once this bdev and all of its open descriptors have been cleaned up, this function
898 	 *  will be called again via the unregister completion callback to continue the cleanup
899 	 *  process with the next bdev.
900 	 */
901 	bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs);
902 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name);
903 	spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev);
904 }
905 
906 void
907 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg)
908 {
909 	assert(cb_fn != NULL);
910 
911 	g_fini_thread = spdk_get_thread();
912 
913 	g_fini_cb_fn = cb_fn;
914 	g_fini_cb_arg = cb_arg;
915 
916 	_spdk_bdev_finish_unregister_bdevs_iter(NULL, 0);
917 }
918 
919 static struct spdk_bdev_io *
920 spdk_bdev_get_io(struct spdk_bdev_channel *channel)
921 {
922 	struct spdk_bdev_mgmt_channel *ch = channel->shared_resource->mgmt_ch;
923 	struct spdk_bdev_io *bdev_io;
924 
925 	if (ch->per_thread_cache_count > 0) {
926 		bdev_io = STAILQ_FIRST(&ch->per_thread_cache);
927 		STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link);
928 		ch->per_thread_cache_count--;
929 	} else if (spdk_unlikely(!TAILQ_EMPTY(&ch->io_wait_queue))) {
930 		/*
931 		 * Don't try to look for bdev_ios in the global pool if there are
932 		 * waiters on bdev_ios - we don't want this caller to jump the line.
933 		 */
934 		bdev_io = NULL;
935 	} else {
936 		bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool);
937 	}
938 
939 	return bdev_io;
940 }
941 
942 void
943 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io)
944 {
945 	struct spdk_bdev_mgmt_channel *ch = bdev_io->internal.ch->shared_resource->mgmt_ch;
946 
947 	assert(bdev_io != NULL);
948 	assert(bdev_io->internal.status != SPDK_BDEV_IO_STATUS_PENDING);
949 
950 	if (bdev_io->internal.buf != NULL) {
951 		spdk_bdev_io_put_buf(bdev_io);
952 	}
953 
954 	if (ch->per_thread_cache_count < ch->bdev_io_cache_size) {
955 		ch->per_thread_cache_count++;
956 		STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link);
957 		while (ch->per_thread_cache_count > 0 && !TAILQ_EMPTY(&ch->io_wait_queue)) {
958 			struct spdk_bdev_io_wait_entry *entry;
959 
960 			entry = TAILQ_FIRST(&ch->io_wait_queue);
961 			TAILQ_REMOVE(&ch->io_wait_queue, entry, link);
962 			entry->cb_fn(entry->cb_arg);
963 		}
964 	} else {
965 		/* We should never have a full cache with entries on the io wait queue. */
966 		assert(TAILQ_EMPTY(&ch->io_wait_queue));
967 		spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
968 	}
969 }
970 
971 static uint64_t
972 _spdk_bdev_get_io_size_in_byte(struct spdk_bdev_io *bdev_io)
973 {
974 	struct spdk_bdev	*bdev = bdev_io->bdev;
975 
976 	switch (bdev_io->type) {
977 	case SPDK_BDEV_IO_TYPE_NVME_ADMIN:
978 	case SPDK_BDEV_IO_TYPE_NVME_IO:
979 	case SPDK_BDEV_IO_TYPE_NVME_IO_MD:
980 		return bdev_io->u.nvme_passthru.nbytes;
981 	case SPDK_BDEV_IO_TYPE_READ:
982 	case SPDK_BDEV_IO_TYPE_WRITE:
983 	case SPDK_BDEV_IO_TYPE_UNMAP:
984 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
985 		return bdev_io->u.bdev.num_blocks * bdev->blocklen;
986 	default:
987 		return 0;
988 	}
989 }
990 
991 static void
992 _spdk_bdev_qos_io_submit(struct spdk_bdev_channel *ch)
993 {
994 	struct spdk_bdev_io		*bdev_io = NULL;
995 	struct spdk_bdev		*bdev = ch->bdev;
996 	struct spdk_bdev_qos		*qos = bdev->internal.qos;
997 	struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource;
998 
999 	while (!TAILQ_EMPTY(&qos->queued)) {
1000 		if (qos->max_ios_per_timeslice > 0 &&
1001 		    qos->io_submitted_this_timeslice >= qos->max_ios_per_timeslice) {
1002 			break;
1003 		}
1004 
1005 		if (qos->max_byte_per_timeslice > 0 &&
1006 		    qos->byte_submitted_this_timeslice >= qos->max_byte_per_timeslice) {
1007 			break;
1008 		}
1009 
1010 		bdev_io = TAILQ_FIRST(&qos->queued);
1011 		TAILQ_REMOVE(&qos->queued, bdev_io, internal.link);
1012 		qos->io_submitted_this_timeslice++;
1013 		qos->byte_submitted_this_timeslice += _spdk_bdev_get_io_size_in_byte(bdev_io);
1014 		ch->io_outstanding++;
1015 		shared_resource->io_outstanding++;
1016 		bdev->fn_table->submit_request(ch->channel, bdev_io);
1017 	}
1018 }
1019 
1020 static void
1021 _spdk_bdev_io_submit(void *ctx)
1022 {
1023 	struct spdk_bdev_io *bdev_io = ctx;
1024 	struct spdk_bdev *bdev = bdev_io->bdev;
1025 	struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch;
1026 	struct spdk_io_channel *ch = bdev_ch->channel;
1027 	struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource;
1028 
1029 	bdev_io->internal.submit_tsc = spdk_get_ticks();
1030 	bdev_ch->io_outstanding++;
1031 	shared_resource->io_outstanding++;
1032 	bdev_io->internal.in_submit_request = true;
1033 	if (spdk_likely(bdev_ch->flags == 0)) {
1034 		if (spdk_likely(TAILQ_EMPTY(&shared_resource->nomem_io))) {
1035 			bdev->fn_table->submit_request(ch, bdev_io);
1036 		} else {
1037 			bdev_ch->io_outstanding--;
1038 			shared_resource->io_outstanding--;
1039 			TAILQ_INSERT_TAIL(&shared_resource->nomem_io, bdev_io, internal.link);
1040 		}
1041 	} else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) {
1042 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1043 	} else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) {
1044 		bdev_ch->io_outstanding--;
1045 		shared_resource->io_outstanding--;
1046 		TAILQ_INSERT_TAIL(&bdev->internal.qos->queued, bdev_io, internal.link);
1047 		_spdk_bdev_qos_io_submit(bdev_ch);
1048 	} else {
1049 		SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags);
1050 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1051 	}
1052 	bdev_io->internal.in_submit_request = false;
1053 }
1054 
1055 static void
1056 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io)
1057 {
1058 	struct spdk_bdev *bdev = bdev_io->bdev;
1059 	struct spdk_thread *thread = spdk_io_channel_get_thread(bdev_io->internal.ch->channel);
1060 
1061 	assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING);
1062 
1063 	if (bdev_io->internal.ch->flags & BDEV_CH_QOS_ENABLED) {
1064 		if (thread == bdev->internal.qos->thread) {
1065 			_spdk_bdev_io_submit(bdev_io);
1066 		} else {
1067 			bdev_io->internal.io_submit_ch = bdev_io->internal.ch;
1068 			bdev_io->internal.ch = bdev->internal.qos->ch;
1069 			spdk_thread_send_msg(bdev->internal.qos->thread, _spdk_bdev_io_submit, bdev_io);
1070 		}
1071 	} else {
1072 		_spdk_bdev_io_submit(bdev_io);
1073 	}
1074 }
1075 
1076 static void
1077 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io)
1078 {
1079 	struct spdk_bdev *bdev = bdev_io->bdev;
1080 	struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch;
1081 	struct spdk_io_channel *ch = bdev_ch->channel;
1082 
1083 	assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING);
1084 
1085 	bdev_io->internal.in_submit_request = true;
1086 	bdev->fn_table->submit_request(ch, bdev_io);
1087 	bdev_io->internal.in_submit_request = false;
1088 }
1089 
1090 static void
1091 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io,
1092 		  struct spdk_bdev *bdev, void *cb_arg,
1093 		  spdk_bdev_io_completion_cb cb)
1094 {
1095 	bdev_io->bdev = bdev;
1096 	bdev_io->internal.caller_ctx = cb_arg;
1097 	bdev_io->internal.cb = cb;
1098 	bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING;
1099 	bdev_io->internal.in_submit_request = false;
1100 	bdev_io->internal.buf = NULL;
1101 	bdev_io->internal.io_submit_ch = NULL;
1102 }
1103 
1104 static bool
1105 _spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type)
1106 {
1107 	return bdev->fn_table->io_type_supported(bdev->ctxt, io_type);
1108 }
1109 
1110 bool
1111 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type)
1112 {
1113 	bool supported;
1114 
1115 	supported = _spdk_bdev_io_type_supported(bdev, io_type);
1116 
1117 	if (!supported) {
1118 		switch (io_type) {
1119 		case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
1120 			/* The bdev layer will emulate write zeroes as long as write is supported. */
1121 			supported = _spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE);
1122 			break;
1123 		default:
1124 			break;
1125 		}
1126 	}
1127 
1128 	return supported;
1129 }
1130 
1131 int
1132 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
1133 {
1134 	if (bdev->fn_table->dump_info_json) {
1135 		return bdev->fn_table->dump_info_json(bdev->ctxt, w);
1136 	}
1137 
1138 	return 0;
1139 }
1140 
1141 void
1142 spdk_bdev_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
1143 {
1144 	assert(bdev != NULL);
1145 	assert(w != NULL);
1146 
1147 	if (bdev->fn_table->write_config_json) {
1148 		bdev->fn_table->write_config_json(bdev, w);
1149 	} else {
1150 		spdk_json_write_object_begin(w);
1151 		spdk_json_write_named_string(w, "name", bdev->name);
1152 		spdk_json_write_object_end(w);
1153 	}
1154 }
1155 
1156 static void
1157 spdk_bdev_qos_update_max_quota_per_timeslice(struct spdk_bdev_qos *qos)
1158 {
1159 	uint64_t max_ios_per_timeslice = 0, max_byte_per_timeslice = 0;
1160 
1161 	if (qos->iops_rate_limit > 0) {
1162 		max_ios_per_timeslice = qos->iops_rate_limit * SPDK_BDEV_QOS_TIMESLICE_IN_USEC /
1163 					SPDK_BDEV_SEC_TO_USEC;
1164 		qos->max_ios_per_timeslice = spdk_max(max_ios_per_timeslice,
1165 						      SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE);
1166 	}
1167 
1168 	if (qos->byte_rate_limit > 0) {
1169 		max_byte_per_timeslice = qos->byte_rate_limit * SPDK_BDEV_QOS_TIMESLICE_IN_USEC /
1170 					 SPDK_BDEV_SEC_TO_USEC;
1171 		qos->max_byte_per_timeslice = spdk_max(max_byte_per_timeslice,
1172 						       SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE);
1173 	}
1174 }
1175 
1176 static int
1177 spdk_bdev_channel_poll_qos(void *arg)
1178 {
1179 	struct spdk_bdev_qos *qos = arg;
1180 
1181 	/* Reset for next round of rate limiting */
1182 	qos->io_submitted_this_timeslice = 0;
1183 	qos->byte_submitted_this_timeslice = 0;
1184 
1185 	_spdk_bdev_qos_io_submit(qos->ch);
1186 
1187 	return -1;
1188 }
1189 
1190 static void
1191 _spdk_bdev_channel_destroy_resource(struct spdk_bdev_channel *ch)
1192 {
1193 	struct spdk_bdev_shared_resource *shared_resource;
1194 
1195 	if (!ch) {
1196 		return;
1197 	}
1198 
1199 	if (ch->channel) {
1200 		spdk_put_io_channel(ch->channel);
1201 	}
1202 
1203 	assert(ch->io_outstanding == 0);
1204 
1205 	shared_resource = ch->shared_resource;
1206 	if (shared_resource) {
1207 		assert(ch->io_outstanding == 0);
1208 		assert(shared_resource->ref > 0);
1209 		shared_resource->ref--;
1210 		if (shared_resource->ref == 0) {
1211 			assert(shared_resource->io_outstanding == 0);
1212 			TAILQ_REMOVE(&shared_resource->mgmt_ch->shared_resources, shared_resource, link);
1213 			spdk_put_io_channel(spdk_io_channel_from_ctx(shared_resource->mgmt_ch));
1214 			free(shared_resource);
1215 		}
1216 	}
1217 }
1218 
1219 /* Caller must hold bdev->internal.mutex. */
1220 static int
1221 _spdk_bdev_enable_qos(struct spdk_bdev *bdev, struct spdk_bdev_channel *ch)
1222 {
1223 	struct spdk_bdev_qos *qos = bdev->internal.qos;
1224 
1225 	/* Rate limiting on this bdev enabled */
1226 	if (qos) {
1227 		if (qos->ch == NULL) {
1228 			struct spdk_io_channel *io_ch;
1229 
1230 			SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Selecting channel %p as QoS channel for bdev %s on thread %p\n", ch,
1231 				      bdev->name, spdk_get_thread());
1232 
1233 			/* No qos channel has been selected, so set one up */
1234 
1235 			/* Take another reference to ch */
1236 			io_ch = spdk_get_io_channel(__bdev_to_io_dev(bdev));
1237 			qos->ch = ch;
1238 
1239 			qos->thread = spdk_io_channel_get_thread(io_ch);
1240 
1241 			TAILQ_INIT(&qos->queued);
1242 			spdk_bdev_qos_update_max_quota_per_timeslice(qos);
1243 			qos->io_submitted_this_timeslice = 0;
1244 			qos->byte_submitted_this_timeslice = 0;
1245 
1246 			qos->poller = spdk_poller_register(spdk_bdev_channel_poll_qos,
1247 							   qos,
1248 							   SPDK_BDEV_QOS_TIMESLICE_IN_USEC);
1249 		}
1250 
1251 		ch->flags |= BDEV_CH_QOS_ENABLED;
1252 	}
1253 
1254 	return 0;
1255 }
1256 
1257 static int
1258 spdk_bdev_channel_create(void *io_device, void *ctx_buf)
1259 {
1260 	struct spdk_bdev		*bdev = __bdev_from_io_dev(io_device);
1261 	struct spdk_bdev_channel	*ch = ctx_buf;
1262 	struct spdk_io_channel		*mgmt_io_ch;
1263 	struct spdk_bdev_mgmt_channel	*mgmt_ch;
1264 	struct spdk_bdev_shared_resource *shared_resource;
1265 
1266 	ch->bdev = bdev;
1267 	ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt);
1268 	if (!ch->channel) {
1269 		return -1;
1270 	}
1271 
1272 	mgmt_io_ch = spdk_get_io_channel(&g_bdev_mgr);
1273 	if (!mgmt_io_ch) {
1274 		return -1;
1275 	}
1276 
1277 	mgmt_ch = spdk_io_channel_get_ctx(mgmt_io_ch);
1278 	TAILQ_FOREACH(shared_resource, &mgmt_ch->shared_resources, link) {
1279 		if (shared_resource->shared_ch == ch->channel) {
1280 			spdk_put_io_channel(mgmt_io_ch);
1281 			shared_resource->ref++;
1282 			break;
1283 		}
1284 	}
1285 
1286 	if (shared_resource == NULL) {
1287 		shared_resource = calloc(1, sizeof(*shared_resource));
1288 		if (shared_resource == NULL) {
1289 			spdk_put_io_channel(mgmt_io_ch);
1290 			return -1;
1291 		}
1292 
1293 		shared_resource->mgmt_ch = mgmt_ch;
1294 		shared_resource->io_outstanding = 0;
1295 		TAILQ_INIT(&shared_resource->nomem_io);
1296 		shared_resource->nomem_threshold = 0;
1297 		shared_resource->shared_ch = ch->channel;
1298 		shared_resource->ref = 1;
1299 		TAILQ_INSERT_TAIL(&mgmt_ch->shared_resources, shared_resource, link);
1300 	}
1301 
1302 	memset(&ch->stat, 0, sizeof(ch->stat));
1303 	ch->stat.ticks_rate = spdk_get_ticks_hz();
1304 	ch->io_outstanding = 0;
1305 	TAILQ_INIT(&ch->queued_resets);
1306 	ch->flags = 0;
1307 	ch->shared_resource = shared_resource;
1308 
1309 #ifdef SPDK_CONFIG_VTUNE
1310 	{
1311 		char *name;
1312 		__itt_init_ittlib(NULL, 0);
1313 		name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch);
1314 		if (!name) {
1315 			_spdk_bdev_channel_destroy_resource(ch);
1316 			return -1;
1317 		}
1318 		ch->handle = __itt_string_handle_create(name);
1319 		free(name);
1320 		ch->start_tsc = spdk_get_ticks();
1321 		ch->interval_tsc = spdk_get_ticks_hz() / 100;
1322 		memset(&ch->prev_stat, 0, sizeof(ch->prev_stat));
1323 	}
1324 #endif
1325 
1326 	pthread_mutex_lock(&bdev->internal.mutex);
1327 
1328 	if (_spdk_bdev_enable_qos(bdev, ch)) {
1329 		_spdk_bdev_channel_destroy_resource(ch);
1330 		pthread_mutex_unlock(&bdev->internal.mutex);
1331 		return -1;
1332 	}
1333 
1334 	pthread_mutex_unlock(&bdev->internal.mutex);
1335 
1336 	return 0;
1337 }
1338 
1339 /*
1340  * Abort I/O that are waiting on a data buffer.  These types of I/O are
1341  *  linked using the spdk_bdev_io internal.buf_link TAILQ_ENTRY.
1342  */
1343 static void
1344 _spdk_bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch)
1345 {
1346 	bdev_io_stailq_t tmp;
1347 	struct spdk_bdev_io *bdev_io;
1348 
1349 	STAILQ_INIT(&tmp);
1350 
1351 	while (!STAILQ_EMPTY(queue)) {
1352 		bdev_io = STAILQ_FIRST(queue);
1353 		STAILQ_REMOVE_HEAD(queue, internal.buf_link);
1354 		if (bdev_io->internal.ch == ch) {
1355 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1356 		} else {
1357 			STAILQ_INSERT_TAIL(&tmp, bdev_io, internal.buf_link);
1358 		}
1359 	}
1360 
1361 	STAILQ_SWAP(&tmp, queue, spdk_bdev_io);
1362 }
1363 
1364 /*
1365  * Abort I/O that are queued waiting for submission.  These types of I/O are
1366  *  linked using the spdk_bdev_io link TAILQ_ENTRY.
1367  */
1368 static void
1369 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch)
1370 {
1371 	struct spdk_bdev_io *bdev_io, *tmp;
1372 
1373 	TAILQ_FOREACH_SAFE(bdev_io, queue, internal.link, tmp) {
1374 		if (bdev_io->internal.ch == ch) {
1375 			TAILQ_REMOVE(queue, bdev_io, internal.link);
1376 			/*
1377 			 * spdk_bdev_io_complete() assumes that the completed I/O had
1378 			 *  been submitted to the bdev module.  Since in this case it
1379 			 *  hadn't, bump io_outstanding to account for the decrement
1380 			 *  that spdk_bdev_io_complete() will do.
1381 			 */
1382 			if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) {
1383 				ch->io_outstanding++;
1384 				ch->shared_resource->io_outstanding++;
1385 			}
1386 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1387 		}
1388 	}
1389 }
1390 
1391 static void
1392 spdk_bdev_qos_channel_destroy(void *cb_arg)
1393 {
1394 	struct spdk_bdev_qos *qos = cb_arg;
1395 
1396 	spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch));
1397 	spdk_poller_unregister(&qos->poller);
1398 
1399 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Free QoS %p.\n", qos);
1400 
1401 	free(qos);
1402 }
1403 
1404 static int
1405 spdk_bdev_qos_destroy(struct spdk_bdev *bdev)
1406 {
1407 	/*
1408 	 * Cleanly shutting down the QoS poller is tricky, because
1409 	 * during the asynchronous operation the user could open
1410 	 * a new descriptor and create a new channel, spawning
1411 	 * a new QoS poller.
1412 	 *
1413 	 * The strategy is to create a new QoS structure here and swap it
1414 	 * in. The shutdown path then continues to refer to the old one
1415 	 * until it completes and then releases it.
1416 	 */
1417 	struct spdk_bdev_qos *new_qos, *old_qos;
1418 
1419 	old_qos = bdev->internal.qos;
1420 
1421 	new_qos = calloc(1, sizeof(*new_qos));
1422 	if (!new_qos) {
1423 		SPDK_ERRLOG("Unable to allocate memory to shut down QoS.\n");
1424 		return -ENOMEM;
1425 	}
1426 
1427 	/* Copy the old QoS data into the newly allocated structure */
1428 	memcpy(new_qos, old_qos, sizeof(*new_qos));
1429 
1430 	/* Zero out the key parts of the QoS structure */
1431 	new_qos->ch = NULL;
1432 	new_qos->thread = NULL;
1433 	new_qos->max_ios_per_timeslice = 0;
1434 	new_qos->max_byte_per_timeslice = 0;
1435 	new_qos->io_submitted_this_timeslice = 0;
1436 	new_qos->byte_submitted_this_timeslice = 0;
1437 	new_qos->poller = NULL;
1438 	TAILQ_INIT(&new_qos->queued);
1439 
1440 	bdev->internal.qos = new_qos;
1441 
1442 	spdk_thread_send_msg(old_qos->thread, spdk_bdev_qos_channel_destroy,
1443 			     old_qos);
1444 
1445 	/* It is safe to continue with destroying the bdev even though the QoS channel hasn't
1446 	 * been destroyed yet. The destruction path will end up waiting for the final
1447 	 * channel to be put before it releases resources. */
1448 
1449 	return 0;
1450 }
1451 
1452 static void
1453 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf)
1454 {
1455 	struct spdk_bdev_channel	*ch = ctx_buf;
1456 	struct spdk_bdev_mgmt_channel	*mgmt_ch;
1457 	struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource;
1458 
1459 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Destroying channel %p for bdev %s on thread %p\n", ch, ch->bdev->name,
1460 		      spdk_get_thread());
1461 
1462 	mgmt_ch = shared_resource->mgmt_ch;
1463 
1464 	_spdk_bdev_abort_queued_io(&ch->queued_resets, ch);
1465 	_spdk_bdev_abort_queued_io(&shared_resource->nomem_io, ch);
1466 	_spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_small, ch);
1467 	_spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_large, ch);
1468 
1469 	_spdk_bdev_channel_destroy_resource(ch);
1470 }
1471 
1472 int
1473 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias)
1474 {
1475 	struct spdk_bdev_alias *tmp;
1476 
1477 	if (alias == NULL) {
1478 		SPDK_ERRLOG("Empty alias passed\n");
1479 		return -EINVAL;
1480 	}
1481 
1482 	if (spdk_bdev_get_by_name(alias)) {
1483 		SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias);
1484 		return -EEXIST;
1485 	}
1486 
1487 	tmp = calloc(1, sizeof(*tmp));
1488 	if (tmp == NULL) {
1489 		SPDK_ERRLOG("Unable to allocate alias\n");
1490 		return -ENOMEM;
1491 	}
1492 
1493 	tmp->alias = strdup(alias);
1494 	if (tmp->alias == NULL) {
1495 		free(tmp);
1496 		SPDK_ERRLOG("Unable to allocate alias\n");
1497 		return -ENOMEM;
1498 	}
1499 
1500 	TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq);
1501 
1502 	return 0;
1503 }
1504 
1505 int
1506 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias)
1507 {
1508 	struct spdk_bdev_alias *tmp;
1509 
1510 	TAILQ_FOREACH(tmp, &bdev->aliases, tailq) {
1511 		if (strcmp(alias, tmp->alias) == 0) {
1512 			TAILQ_REMOVE(&bdev->aliases, tmp, tailq);
1513 			free(tmp->alias);
1514 			free(tmp);
1515 			return 0;
1516 		}
1517 	}
1518 
1519 	SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias);
1520 
1521 	return -ENOENT;
1522 }
1523 
1524 struct spdk_io_channel *
1525 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc)
1526 {
1527 	return spdk_get_io_channel(__bdev_to_io_dev(desc->bdev));
1528 }
1529 
1530 const char *
1531 spdk_bdev_get_name(const struct spdk_bdev *bdev)
1532 {
1533 	return bdev->name;
1534 }
1535 
1536 const char *
1537 spdk_bdev_get_product_name(const struct spdk_bdev *bdev)
1538 {
1539 	return bdev->product_name;
1540 }
1541 
1542 const struct spdk_bdev_aliases_list *
1543 spdk_bdev_get_aliases(const struct spdk_bdev *bdev)
1544 {
1545 	return &bdev->aliases;
1546 }
1547 
1548 uint32_t
1549 spdk_bdev_get_block_size(const struct spdk_bdev *bdev)
1550 {
1551 	return bdev->blocklen;
1552 }
1553 
1554 uint64_t
1555 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev)
1556 {
1557 	return bdev->blockcnt;
1558 }
1559 
1560 uint64_t
1561 spdk_bdev_get_qos_ios_per_sec(struct spdk_bdev *bdev)
1562 {
1563 	uint64_t iops_rate_limit = 0;
1564 
1565 	pthread_mutex_lock(&bdev->internal.mutex);
1566 	if (bdev->internal.qos) {
1567 		iops_rate_limit = bdev->internal.qos->iops_rate_limit;
1568 	}
1569 	pthread_mutex_unlock(&bdev->internal.mutex);
1570 
1571 	return iops_rate_limit;
1572 }
1573 
1574 size_t
1575 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev)
1576 {
1577 	/* TODO: push this logic down to the bdev modules */
1578 	if (bdev->need_aligned_buffer) {
1579 		return bdev->blocklen;
1580 	}
1581 
1582 	return 1;
1583 }
1584 
1585 uint32_t
1586 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev)
1587 {
1588 	return bdev->optimal_io_boundary;
1589 }
1590 
1591 bool
1592 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev)
1593 {
1594 	return bdev->write_cache;
1595 }
1596 
1597 const struct spdk_uuid *
1598 spdk_bdev_get_uuid(const struct spdk_bdev *bdev)
1599 {
1600 	return &bdev->uuid;
1601 }
1602 
1603 int
1604 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size)
1605 {
1606 	int ret;
1607 
1608 	pthread_mutex_lock(&bdev->internal.mutex);
1609 
1610 	/* bdev has open descriptors */
1611 	if (!TAILQ_EMPTY(&bdev->internal.open_descs) &&
1612 	    bdev->blockcnt > size) {
1613 		ret = -EBUSY;
1614 	} else {
1615 		bdev->blockcnt = size;
1616 		ret = 0;
1617 	}
1618 
1619 	pthread_mutex_unlock(&bdev->internal.mutex);
1620 
1621 	return ret;
1622 }
1623 
1624 /*
1625  * Convert I/O offset and length from bytes to blocks.
1626  *
1627  * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size.
1628  */
1629 static uint64_t
1630 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks,
1631 			  uint64_t num_bytes, uint64_t *num_blocks)
1632 {
1633 	uint32_t block_size = bdev->blocklen;
1634 
1635 	*offset_blocks = offset_bytes / block_size;
1636 	*num_blocks = num_bytes / block_size;
1637 
1638 	return (offset_bytes % block_size) | (num_bytes % block_size);
1639 }
1640 
1641 static bool
1642 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks)
1643 {
1644 	/* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there
1645 	 * has been an overflow and hence the offset has been wrapped around */
1646 	if (offset_blocks + num_blocks < offset_blocks) {
1647 		return false;
1648 	}
1649 
1650 	/* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */
1651 	if (offset_blocks + num_blocks > bdev->blockcnt) {
1652 		return false;
1653 	}
1654 
1655 	return true;
1656 }
1657 
1658 int
1659 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1660 	       void *buf, uint64_t offset, uint64_t nbytes,
1661 	       spdk_bdev_io_completion_cb cb, void *cb_arg)
1662 {
1663 	uint64_t offset_blocks, num_blocks;
1664 
1665 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1666 		return -EINVAL;
1667 	}
1668 
1669 	return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
1670 }
1671 
1672 int
1673 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1674 		      void *buf, uint64_t offset_blocks, uint64_t num_blocks,
1675 		      spdk_bdev_io_completion_cb cb, void *cb_arg)
1676 {
1677 	struct spdk_bdev *bdev = desc->bdev;
1678 	struct spdk_bdev_io *bdev_io;
1679 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1680 
1681 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1682 		return -EINVAL;
1683 	}
1684 
1685 	bdev_io = spdk_bdev_get_io(channel);
1686 	if (!bdev_io) {
1687 		return -ENOMEM;
1688 	}
1689 
1690 	bdev_io->internal.ch = channel;
1691 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
1692 	bdev_io->u.bdev.iov.iov_base = buf;
1693 	bdev_io->u.bdev.iov.iov_len = num_blocks * bdev->blocklen;
1694 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1695 	bdev_io->u.bdev.iovcnt = 1;
1696 	bdev_io->u.bdev.num_blocks = num_blocks;
1697 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1698 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1699 
1700 	spdk_bdev_io_submit(bdev_io);
1701 	return 0;
1702 }
1703 
1704 int
1705 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1706 		struct iovec *iov, int iovcnt,
1707 		uint64_t offset, uint64_t nbytes,
1708 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1709 {
1710 	uint64_t offset_blocks, num_blocks;
1711 
1712 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1713 		return -EINVAL;
1714 	}
1715 
1716 	return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
1717 }
1718 
1719 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1720 			   struct iovec *iov, int iovcnt,
1721 			   uint64_t offset_blocks, uint64_t num_blocks,
1722 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
1723 {
1724 	struct spdk_bdev *bdev = desc->bdev;
1725 	struct spdk_bdev_io *bdev_io;
1726 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1727 
1728 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1729 		return -EINVAL;
1730 	}
1731 
1732 	bdev_io = spdk_bdev_get_io(channel);
1733 	if (!bdev_io) {
1734 		return -ENOMEM;
1735 	}
1736 
1737 	bdev_io->internal.ch = channel;
1738 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
1739 	bdev_io->u.bdev.iovs = iov;
1740 	bdev_io->u.bdev.iovcnt = iovcnt;
1741 	bdev_io->u.bdev.num_blocks = num_blocks;
1742 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1743 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1744 
1745 	spdk_bdev_io_submit(bdev_io);
1746 	return 0;
1747 }
1748 
1749 int
1750 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1751 		void *buf, uint64_t offset, uint64_t nbytes,
1752 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1753 {
1754 	uint64_t offset_blocks, num_blocks;
1755 
1756 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1757 		return -EINVAL;
1758 	}
1759 
1760 	return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
1761 }
1762 
1763 int
1764 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1765 		       void *buf, uint64_t offset_blocks, uint64_t num_blocks,
1766 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1767 {
1768 	struct spdk_bdev *bdev = desc->bdev;
1769 	struct spdk_bdev_io *bdev_io;
1770 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1771 
1772 	if (!desc->write) {
1773 		return -EBADF;
1774 	}
1775 
1776 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1777 		return -EINVAL;
1778 	}
1779 
1780 	bdev_io = spdk_bdev_get_io(channel);
1781 	if (!bdev_io) {
1782 		return -ENOMEM;
1783 	}
1784 
1785 	bdev_io->internal.ch = channel;
1786 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1787 	bdev_io->u.bdev.iov.iov_base = buf;
1788 	bdev_io->u.bdev.iov.iov_len = num_blocks * bdev->blocklen;
1789 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1790 	bdev_io->u.bdev.iovcnt = 1;
1791 	bdev_io->u.bdev.num_blocks = num_blocks;
1792 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1793 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1794 
1795 	spdk_bdev_io_submit(bdev_io);
1796 	return 0;
1797 }
1798 
1799 int
1800 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1801 		 struct iovec *iov, int iovcnt,
1802 		 uint64_t offset, uint64_t len,
1803 		 spdk_bdev_io_completion_cb cb, void *cb_arg)
1804 {
1805 	uint64_t offset_blocks, num_blocks;
1806 
1807 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
1808 		return -EINVAL;
1809 	}
1810 
1811 	return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
1812 }
1813 
1814 int
1815 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1816 			struct iovec *iov, int iovcnt,
1817 			uint64_t offset_blocks, uint64_t num_blocks,
1818 			spdk_bdev_io_completion_cb cb, void *cb_arg)
1819 {
1820 	struct spdk_bdev *bdev = desc->bdev;
1821 	struct spdk_bdev_io *bdev_io;
1822 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1823 
1824 	if (!desc->write) {
1825 		return -EBADF;
1826 	}
1827 
1828 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1829 		return -EINVAL;
1830 	}
1831 
1832 	bdev_io = spdk_bdev_get_io(channel);
1833 	if (!bdev_io) {
1834 		return -ENOMEM;
1835 	}
1836 
1837 	bdev_io->internal.ch = channel;
1838 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1839 	bdev_io->u.bdev.iovs = iov;
1840 	bdev_io->u.bdev.iovcnt = iovcnt;
1841 	bdev_io->u.bdev.num_blocks = num_blocks;
1842 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1843 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1844 
1845 	spdk_bdev_io_submit(bdev_io);
1846 	return 0;
1847 }
1848 
1849 int
1850 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1851 		       uint64_t offset, uint64_t len,
1852 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1853 {
1854 	uint64_t offset_blocks, num_blocks;
1855 
1856 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
1857 		return -EINVAL;
1858 	}
1859 
1860 	return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1861 }
1862 
1863 int
1864 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1865 			      uint64_t offset_blocks, uint64_t num_blocks,
1866 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1867 {
1868 	struct spdk_bdev *bdev = desc->bdev;
1869 	struct spdk_bdev_io *bdev_io;
1870 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1871 	uint64_t len;
1872 	bool split_request = false;
1873 
1874 	if (!desc->write) {
1875 		return -EBADF;
1876 	}
1877 
1878 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1879 		return -EINVAL;
1880 	}
1881 
1882 	bdev_io = spdk_bdev_get_io(channel);
1883 
1884 	if (!bdev_io) {
1885 		return -ENOMEM;
1886 	}
1887 
1888 	bdev_io->internal.ch = channel;
1889 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1890 
1891 	if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
1892 		bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES;
1893 		bdev_io->u.bdev.num_blocks = num_blocks;
1894 		bdev_io->u.bdev.iovs = NULL;
1895 		bdev_io->u.bdev.iovcnt = 0;
1896 
1897 	} else if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)) {
1898 		assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE);
1899 
1900 		len = spdk_bdev_get_block_size(bdev) * num_blocks;
1901 
1902 		if (len > ZERO_BUFFER_SIZE) {
1903 			split_request = true;
1904 			len = ZERO_BUFFER_SIZE;
1905 		}
1906 
1907 		bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1908 		bdev_io->u.bdev.iov.iov_base = g_bdev_mgr.zero_buffer;
1909 		bdev_io->u.bdev.iov.iov_len = len;
1910 		bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1911 		bdev_io->u.bdev.iovcnt = 1;
1912 		bdev_io->u.bdev.num_blocks = len / spdk_bdev_get_block_size(bdev);
1913 		bdev_io->u.bdev.split_remaining_num_blocks = num_blocks - bdev_io->u.bdev.num_blocks;
1914 		bdev_io->u.bdev.split_current_offset_blocks = offset_blocks + bdev_io->u.bdev.num_blocks;
1915 	} else {
1916 		spdk_bdev_free_io(bdev_io);
1917 		return -ENOTSUP;
1918 	}
1919 
1920 	if (split_request) {
1921 		bdev_io->u.bdev.stored_user_cb = cb;
1922 		spdk_bdev_io_init(bdev_io, bdev, cb_arg, spdk_bdev_write_zeroes_split);
1923 	} else {
1924 		spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1925 	}
1926 	spdk_bdev_io_submit(bdev_io);
1927 	return 0;
1928 }
1929 
1930 int
1931 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1932 		uint64_t offset, uint64_t nbytes,
1933 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1934 {
1935 	uint64_t offset_blocks, num_blocks;
1936 
1937 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1938 		return -EINVAL;
1939 	}
1940 
1941 	return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1942 }
1943 
1944 int
1945 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1946 		       uint64_t offset_blocks, uint64_t num_blocks,
1947 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1948 {
1949 	struct spdk_bdev *bdev = desc->bdev;
1950 	struct spdk_bdev_io *bdev_io;
1951 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1952 
1953 	if (!desc->write) {
1954 		return -EBADF;
1955 	}
1956 
1957 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1958 		return -EINVAL;
1959 	}
1960 
1961 	if (num_blocks == 0) {
1962 		SPDK_ERRLOG("Can't unmap 0 bytes\n");
1963 		return -EINVAL;
1964 	}
1965 
1966 	bdev_io = spdk_bdev_get_io(channel);
1967 	if (!bdev_io) {
1968 		return -ENOMEM;
1969 	}
1970 
1971 	bdev_io->internal.ch = channel;
1972 	bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP;
1973 	bdev_io->u.bdev.iov.iov_base = NULL;
1974 	bdev_io->u.bdev.iov.iov_len = 0;
1975 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1976 	bdev_io->u.bdev.iovcnt = 1;
1977 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1978 	bdev_io->u.bdev.num_blocks = num_blocks;
1979 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1980 
1981 	spdk_bdev_io_submit(bdev_io);
1982 	return 0;
1983 }
1984 
1985 int
1986 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1987 		uint64_t offset, uint64_t length,
1988 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1989 {
1990 	uint64_t offset_blocks, num_blocks;
1991 
1992 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) {
1993 		return -EINVAL;
1994 	}
1995 
1996 	return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1997 }
1998 
1999 int
2000 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2001 		       uint64_t offset_blocks, uint64_t num_blocks,
2002 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
2003 {
2004 	struct spdk_bdev *bdev = desc->bdev;
2005 	struct spdk_bdev_io *bdev_io;
2006 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2007 
2008 	if (!desc->write) {
2009 		return -EBADF;
2010 	}
2011 
2012 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2013 		return -EINVAL;
2014 	}
2015 
2016 	bdev_io = spdk_bdev_get_io(channel);
2017 	if (!bdev_io) {
2018 		return -ENOMEM;
2019 	}
2020 
2021 	bdev_io->internal.ch = channel;
2022 	bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH;
2023 	bdev_io->u.bdev.iovs = NULL;
2024 	bdev_io->u.bdev.iovcnt = 0;
2025 	bdev_io->u.bdev.offset_blocks = offset_blocks;
2026 	bdev_io->u.bdev.num_blocks = num_blocks;
2027 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2028 
2029 	spdk_bdev_io_submit(bdev_io);
2030 	return 0;
2031 }
2032 
2033 static void
2034 _spdk_bdev_reset_dev(struct spdk_io_channel_iter *i, int status)
2035 {
2036 	struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i);
2037 	struct spdk_bdev_io *bdev_io;
2038 
2039 	bdev_io = TAILQ_FIRST(&ch->queued_resets);
2040 	TAILQ_REMOVE(&ch->queued_resets, bdev_io, internal.link);
2041 	spdk_bdev_io_submit_reset(bdev_io);
2042 }
2043 
2044 static void
2045 _spdk_bdev_reset_freeze_channel(struct spdk_io_channel_iter *i)
2046 {
2047 	struct spdk_io_channel		*ch;
2048 	struct spdk_bdev_channel	*channel;
2049 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
2050 	struct spdk_bdev_shared_resource *shared_resource;
2051 	bdev_io_tailq_t			tmp_queued;
2052 
2053 	TAILQ_INIT(&tmp_queued);
2054 
2055 	ch = spdk_io_channel_iter_get_channel(i);
2056 	channel = spdk_io_channel_get_ctx(ch);
2057 	shared_resource = channel->shared_resource;
2058 	mgmt_channel = shared_resource->mgmt_ch;
2059 
2060 	channel->flags |= BDEV_CH_RESET_IN_PROGRESS;
2061 
2062 	if ((channel->flags & BDEV_CH_QOS_ENABLED) != 0) {
2063 		/* The QoS object is always valid and readable while
2064 		 * the channel flag is set, so the lock here should not
2065 		 * be necessary. We're not in the fast path though, so
2066 		 * just take it anyway. */
2067 		pthread_mutex_lock(&channel->bdev->internal.mutex);
2068 		if (channel->bdev->internal.qos->ch == channel) {
2069 			TAILQ_SWAP(&channel->bdev->internal.qos->queued, &tmp_queued, spdk_bdev_io, internal.link);
2070 		}
2071 		pthread_mutex_unlock(&channel->bdev->internal.mutex);
2072 	}
2073 
2074 	_spdk_bdev_abort_queued_io(&shared_resource->nomem_io, channel);
2075 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel);
2076 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel);
2077 	_spdk_bdev_abort_queued_io(&tmp_queued, channel);
2078 
2079 	spdk_for_each_channel_continue(i, 0);
2080 }
2081 
2082 static void
2083 _spdk_bdev_start_reset(void *ctx)
2084 {
2085 	struct spdk_bdev_channel *ch = ctx;
2086 
2087 	spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), _spdk_bdev_reset_freeze_channel,
2088 			      ch, _spdk_bdev_reset_dev);
2089 }
2090 
2091 static void
2092 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch)
2093 {
2094 	struct spdk_bdev *bdev = ch->bdev;
2095 
2096 	assert(!TAILQ_EMPTY(&ch->queued_resets));
2097 
2098 	pthread_mutex_lock(&bdev->internal.mutex);
2099 	if (bdev->internal.reset_in_progress == NULL) {
2100 		bdev->internal.reset_in_progress = TAILQ_FIRST(&ch->queued_resets);
2101 		/*
2102 		 * Take a channel reference for the target bdev for the life of this
2103 		 *  reset.  This guards against the channel getting destroyed while
2104 		 *  spdk_for_each_channel() calls related to this reset IO are in
2105 		 *  progress.  We will release the reference when this reset is
2106 		 *  completed.
2107 		 */
2108 		bdev->internal.reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev));
2109 		_spdk_bdev_start_reset(ch);
2110 	}
2111 	pthread_mutex_unlock(&bdev->internal.mutex);
2112 }
2113 
2114 int
2115 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2116 		spdk_bdev_io_completion_cb cb, void *cb_arg)
2117 {
2118 	struct spdk_bdev *bdev = desc->bdev;
2119 	struct spdk_bdev_io *bdev_io;
2120 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2121 
2122 	bdev_io = spdk_bdev_get_io(channel);
2123 	if (!bdev_io) {
2124 		return -ENOMEM;
2125 	}
2126 
2127 	bdev_io->internal.ch = channel;
2128 	bdev_io->type = SPDK_BDEV_IO_TYPE_RESET;
2129 	bdev_io->u.reset.ch_ref = NULL;
2130 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2131 
2132 	pthread_mutex_lock(&bdev->internal.mutex);
2133 	TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, internal.link);
2134 	pthread_mutex_unlock(&bdev->internal.mutex);
2135 
2136 	_spdk_bdev_channel_start_reset(channel);
2137 
2138 	return 0;
2139 }
2140 
2141 void
2142 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
2143 		      struct spdk_bdev_io_stat *stat)
2144 {
2145 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2146 
2147 	*stat = channel->stat;
2148 }
2149 
2150 static void
2151 _spdk_bdev_get_device_stat_done(struct spdk_io_channel_iter *i, int status)
2152 {
2153 	void *io_device = spdk_io_channel_iter_get_io_device(i);
2154 	struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i);
2155 
2156 	bdev_iostat_ctx->cb(__bdev_from_io_dev(io_device), bdev_iostat_ctx->stat,
2157 			    bdev_iostat_ctx->cb_arg, 0);
2158 	free(bdev_iostat_ctx);
2159 }
2160 
2161 static void
2162 _spdk_bdev_get_each_channel_stat(struct spdk_io_channel_iter *i)
2163 {
2164 	struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i);
2165 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2166 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2167 
2168 	bdev_iostat_ctx->stat->bytes_read += channel->stat.bytes_read;
2169 	bdev_iostat_ctx->stat->num_read_ops += channel->stat.num_read_ops;
2170 	bdev_iostat_ctx->stat->bytes_written += channel->stat.bytes_written;
2171 	bdev_iostat_ctx->stat->num_write_ops += channel->stat.num_write_ops;
2172 
2173 	spdk_for_each_channel_continue(i, 0);
2174 }
2175 
2176 void
2177 spdk_bdev_get_device_stat(struct spdk_bdev *bdev, struct spdk_bdev_io_stat *stat,
2178 			  spdk_bdev_get_device_stat_cb cb, void *cb_arg)
2179 {
2180 	struct spdk_bdev_iostat_ctx *bdev_iostat_ctx;
2181 
2182 	assert(bdev != NULL);
2183 	assert(stat != NULL);
2184 	assert(cb != NULL);
2185 
2186 	bdev_iostat_ctx = calloc(1, sizeof(struct spdk_bdev_iostat_ctx));
2187 	if (bdev_iostat_ctx == NULL) {
2188 		SPDK_ERRLOG("Unable to allocate memory for spdk_bdev_iostat_ctx\n");
2189 		cb(bdev, stat, cb_arg, -ENOMEM);
2190 		return;
2191 	}
2192 
2193 	bdev_iostat_ctx->stat = stat;
2194 	bdev_iostat_ctx->cb = cb;
2195 	bdev_iostat_ctx->cb_arg = cb_arg;
2196 
2197 	spdk_for_each_channel(__bdev_to_io_dev(bdev),
2198 			      _spdk_bdev_get_each_channel_stat,
2199 			      bdev_iostat_ctx,
2200 			      _spdk_bdev_get_device_stat_done);
2201 }
2202 
2203 int
2204 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2205 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
2206 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
2207 {
2208 	struct spdk_bdev *bdev = desc->bdev;
2209 	struct spdk_bdev_io *bdev_io;
2210 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2211 
2212 	if (!desc->write) {
2213 		return -EBADF;
2214 	}
2215 
2216 	bdev_io = spdk_bdev_get_io(channel);
2217 	if (!bdev_io) {
2218 		return -ENOMEM;
2219 	}
2220 
2221 	bdev_io->internal.ch = channel;
2222 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN;
2223 	bdev_io->u.nvme_passthru.cmd = *cmd;
2224 	bdev_io->u.nvme_passthru.buf = buf;
2225 	bdev_io->u.nvme_passthru.nbytes = nbytes;
2226 	bdev_io->u.nvme_passthru.md_buf = NULL;
2227 	bdev_io->u.nvme_passthru.md_len = 0;
2228 
2229 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2230 
2231 	spdk_bdev_io_submit(bdev_io);
2232 	return 0;
2233 }
2234 
2235 int
2236 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2237 			   const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
2238 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
2239 {
2240 	struct spdk_bdev *bdev = desc->bdev;
2241 	struct spdk_bdev_io *bdev_io;
2242 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2243 
2244 	if (!desc->write) {
2245 		/*
2246 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
2247 		 *  to easily determine if the command is a read or write, but for now just
2248 		 *  do not allow io_passthru with a read-only descriptor.
2249 		 */
2250 		return -EBADF;
2251 	}
2252 
2253 	bdev_io = spdk_bdev_get_io(channel);
2254 	if (!bdev_io) {
2255 		return -ENOMEM;
2256 	}
2257 
2258 	bdev_io->internal.ch = channel;
2259 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO;
2260 	bdev_io->u.nvme_passthru.cmd = *cmd;
2261 	bdev_io->u.nvme_passthru.buf = buf;
2262 	bdev_io->u.nvme_passthru.nbytes = nbytes;
2263 	bdev_io->u.nvme_passthru.md_buf = NULL;
2264 	bdev_io->u.nvme_passthru.md_len = 0;
2265 
2266 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2267 
2268 	spdk_bdev_io_submit(bdev_io);
2269 	return 0;
2270 }
2271 
2272 int
2273 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2274 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len,
2275 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
2276 {
2277 	struct spdk_bdev *bdev = desc->bdev;
2278 	struct spdk_bdev_io *bdev_io;
2279 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2280 
2281 	if (!desc->write) {
2282 		/*
2283 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
2284 		 *  to easily determine if the command is a read or write, but for now just
2285 		 *  do not allow io_passthru with a read-only descriptor.
2286 		 */
2287 		return -EBADF;
2288 	}
2289 
2290 	bdev_io = spdk_bdev_get_io(channel);
2291 	if (!bdev_io) {
2292 		return -ENOMEM;
2293 	}
2294 
2295 	bdev_io->internal.ch = channel;
2296 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD;
2297 	bdev_io->u.nvme_passthru.cmd = *cmd;
2298 	bdev_io->u.nvme_passthru.buf = buf;
2299 	bdev_io->u.nvme_passthru.nbytes = nbytes;
2300 	bdev_io->u.nvme_passthru.md_buf = md_buf;
2301 	bdev_io->u.nvme_passthru.md_len = md_len;
2302 
2303 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2304 
2305 	spdk_bdev_io_submit(bdev_io);
2306 	return 0;
2307 }
2308 
2309 int
2310 spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
2311 			struct spdk_bdev_io_wait_entry *entry)
2312 {
2313 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2314 	struct spdk_bdev_mgmt_channel *mgmt_ch = channel->shared_resource->mgmt_ch;
2315 
2316 	if (bdev != entry->bdev) {
2317 		SPDK_ERRLOG("bdevs do not match\n");
2318 		return -EINVAL;
2319 	}
2320 
2321 	if (mgmt_ch->per_thread_cache_count > 0) {
2322 		SPDK_ERRLOG("Cannot queue io_wait if spdk_bdev_io available in per-thread cache\n");
2323 		return -EINVAL;
2324 	}
2325 
2326 	TAILQ_INSERT_TAIL(&mgmt_ch->io_wait_queue, entry, link);
2327 	return 0;
2328 }
2329 
2330 static void
2331 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch)
2332 {
2333 	struct spdk_bdev *bdev = bdev_ch->bdev;
2334 	struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource;
2335 	struct spdk_bdev_io *bdev_io;
2336 
2337 	if (shared_resource->io_outstanding > shared_resource->nomem_threshold) {
2338 		/*
2339 		 * Allow some more I/O to complete before retrying the nomem_io queue.
2340 		 *  Some drivers (such as nvme) cannot immediately take a new I/O in
2341 		 *  the context of a completion, because the resources for the I/O are
2342 		 *  not released until control returns to the bdev poller.  Also, we
2343 		 *  may require several small I/O to complete before a larger I/O
2344 		 *  (that requires splitting) can be submitted.
2345 		 */
2346 		return;
2347 	}
2348 
2349 	while (!TAILQ_EMPTY(&shared_resource->nomem_io)) {
2350 		bdev_io = TAILQ_FIRST(&shared_resource->nomem_io);
2351 		TAILQ_REMOVE(&shared_resource->nomem_io, bdev_io, internal.link);
2352 		bdev_io->internal.ch->io_outstanding++;
2353 		shared_resource->io_outstanding++;
2354 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING;
2355 		bdev->fn_table->submit_request(bdev_io->internal.ch->channel, bdev_io);
2356 		if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) {
2357 			break;
2358 		}
2359 	}
2360 }
2361 
2362 static inline void
2363 _spdk_bdev_io_complete(void *ctx)
2364 {
2365 	struct spdk_bdev_io *bdev_io = ctx;
2366 
2367 	if (spdk_unlikely(bdev_io->internal.in_submit_request || bdev_io->internal.io_submit_ch)) {
2368 		/*
2369 		 * Send the completion to the thread that originally submitted the I/O,
2370 		 * which may not be the current thread in the case of QoS.
2371 		 */
2372 		if (bdev_io->internal.io_submit_ch) {
2373 			bdev_io->internal.ch = bdev_io->internal.io_submit_ch;
2374 			bdev_io->internal.io_submit_ch = NULL;
2375 		}
2376 
2377 		/*
2378 		 * Defer completion to avoid potential infinite recursion if the
2379 		 * user's completion callback issues a new I/O.
2380 		 */
2381 		spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel),
2382 				     _spdk_bdev_io_complete, bdev_io);
2383 		return;
2384 	}
2385 
2386 	if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) {
2387 		switch (bdev_io->type) {
2388 		case SPDK_BDEV_IO_TYPE_READ:
2389 			bdev_io->internal.ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
2390 			bdev_io->internal.ch->stat.num_read_ops++;
2391 			bdev_io->internal.ch->stat.read_latency_ticks += (spdk_get_ticks() - bdev_io->internal.submit_tsc);
2392 			break;
2393 		case SPDK_BDEV_IO_TYPE_WRITE:
2394 			bdev_io->internal.ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
2395 			bdev_io->internal.ch->stat.num_write_ops++;
2396 			bdev_io->internal.ch->stat.write_latency_ticks += (spdk_get_ticks() - bdev_io->internal.submit_tsc);
2397 			break;
2398 		default:
2399 			break;
2400 		}
2401 	}
2402 
2403 #ifdef SPDK_CONFIG_VTUNE
2404 	uint64_t now_tsc = spdk_get_ticks();
2405 	if (now_tsc > (bdev_io->internal.ch->start_tsc + bdev_io->internal.ch->interval_tsc)) {
2406 		uint64_t data[5];
2407 
2408 		data[0] = bdev_io->internal.ch->stat.num_read_ops - bdev_io->internal.ch->prev_stat.num_read_ops;
2409 		data[1] = bdev_io->internal.ch->stat.bytes_read - bdev_io->internal.ch->prev_stat.bytes_read;
2410 		data[2] = bdev_io->internal.ch->stat.num_write_ops - bdev_io->internal.ch->prev_stat.num_write_ops;
2411 		data[3] = bdev_io->internal.ch->stat.bytes_written - bdev_io->internal.ch->prev_stat.bytes_written;
2412 		data[4] = bdev_io->bdev->fn_table->get_spin_time ?
2413 			  bdev_io->bdev->fn_table->get_spin_time(bdev_io->internal.ch->channel) : 0;
2414 
2415 		__itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->internal.ch->handle,
2416 				   __itt_metadata_u64, 5, data);
2417 
2418 		bdev_io->internal.ch->prev_stat = bdev_io->internal.ch->stat;
2419 		bdev_io->internal.ch->start_tsc = now_tsc;
2420 	}
2421 #endif
2422 
2423 	assert(bdev_io->internal.cb != NULL);
2424 	assert(spdk_get_thread() == spdk_io_channel_get_thread(bdev_io->internal.ch->channel));
2425 
2426 	bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS,
2427 			     bdev_io->internal.caller_ctx);
2428 }
2429 
2430 static void
2431 _spdk_bdev_reset_complete(struct spdk_io_channel_iter *i, int status)
2432 {
2433 	struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i);
2434 
2435 	if (bdev_io->u.reset.ch_ref != NULL) {
2436 		spdk_put_io_channel(bdev_io->u.reset.ch_ref);
2437 		bdev_io->u.reset.ch_ref = NULL;
2438 	}
2439 
2440 	_spdk_bdev_io_complete(bdev_io);
2441 }
2442 
2443 static void
2444 _spdk_bdev_unfreeze_channel(struct spdk_io_channel_iter *i)
2445 {
2446 	struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i);
2447 	struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
2448 
2449 	ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS;
2450 	if (!TAILQ_EMPTY(&ch->queued_resets)) {
2451 		_spdk_bdev_channel_start_reset(ch);
2452 	}
2453 
2454 	spdk_for_each_channel_continue(i, 0);
2455 }
2456 
2457 void
2458 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
2459 {
2460 	struct spdk_bdev *bdev = bdev_io->bdev;
2461 	struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch;
2462 	struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource;
2463 
2464 	bdev_io->internal.status = status;
2465 
2466 	if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) {
2467 		bool unlock_channels = false;
2468 
2469 		if (status == SPDK_BDEV_IO_STATUS_NOMEM) {
2470 			SPDK_ERRLOG("NOMEM returned for reset\n");
2471 		}
2472 		pthread_mutex_lock(&bdev->internal.mutex);
2473 		if (bdev_io == bdev->internal.reset_in_progress) {
2474 			bdev->internal.reset_in_progress = NULL;
2475 			unlock_channels = true;
2476 		}
2477 		pthread_mutex_unlock(&bdev->internal.mutex);
2478 
2479 		if (unlock_channels) {
2480 			spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_unfreeze_channel,
2481 					      bdev_io, _spdk_bdev_reset_complete);
2482 			return;
2483 		}
2484 	} else {
2485 		assert(bdev_ch->io_outstanding > 0);
2486 		assert(shared_resource->io_outstanding > 0);
2487 		bdev_ch->io_outstanding--;
2488 		shared_resource->io_outstanding--;
2489 
2490 		if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) {
2491 			TAILQ_INSERT_HEAD(&shared_resource->nomem_io, bdev_io, internal.link);
2492 			/*
2493 			 * Wait for some of the outstanding I/O to complete before we
2494 			 *  retry any of the nomem_io.  Normally we will wait for
2495 			 *  NOMEM_THRESHOLD_COUNT I/O to complete but for low queue
2496 			 *  depth channels we will instead wait for half to complete.
2497 			 */
2498 			shared_resource->nomem_threshold = spdk_max((int64_t)shared_resource->io_outstanding / 2,
2499 							   (int64_t)shared_resource->io_outstanding - NOMEM_THRESHOLD_COUNT);
2500 			return;
2501 		}
2502 
2503 		if (spdk_unlikely(!TAILQ_EMPTY(&shared_resource->nomem_io))) {
2504 			_spdk_bdev_ch_retry_io(bdev_ch);
2505 		}
2506 	}
2507 
2508 	_spdk_bdev_io_complete(bdev_io);
2509 }
2510 
2511 void
2512 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc,
2513 				  enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq)
2514 {
2515 	if (sc == SPDK_SCSI_STATUS_GOOD) {
2516 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS;
2517 	} else {
2518 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SCSI_ERROR;
2519 		bdev_io->internal.error.scsi.sc = sc;
2520 		bdev_io->internal.error.scsi.sk = sk;
2521 		bdev_io->internal.error.scsi.asc = asc;
2522 		bdev_io->internal.error.scsi.ascq = ascq;
2523 	}
2524 
2525 	spdk_bdev_io_complete(bdev_io, bdev_io->internal.status);
2526 }
2527 
2528 void
2529 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io,
2530 			     int *sc, int *sk, int *asc, int *ascq)
2531 {
2532 	assert(sc != NULL);
2533 	assert(sk != NULL);
2534 	assert(asc != NULL);
2535 	assert(ascq != NULL);
2536 
2537 	switch (bdev_io->internal.status) {
2538 	case SPDK_BDEV_IO_STATUS_SUCCESS:
2539 		*sc = SPDK_SCSI_STATUS_GOOD;
2540 		*sk = SPDK_SCSI_SENSE_NO_SENSE;
2541 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
2542 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
2543 		break;
2544 	case SPDK_BDEV_IO_STATUS_NVME_ERROR:
2545 		spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq);
2546 		break;
2547 	case SPDK_BDEV_IO_STATUS_SCSI_ERROR:
2548 		*sc = bdev_io->internal.error.scsi.sc;
2549 		*sk = bdev_io->internal.error.scsi.sk;
2550 		*asc = bdev_io->internal.error.scsi.asc;
2551 		*ascq = bdev_io->internal.error.scsi.ascq;
2552 		break;
2553 	default:
2554 		*sc = SPDK_SCSI_STATUS_CHECK_CONDITION;
2555 		*sk = SPDK_SCSI_SENSE_ABORTED_COMMAND;
2556 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
2557 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
2558 		break;
2559 	}
2560 }
2561 
2562 void
2563 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc)
2564 {
2565 	if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) {
2566 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS;
2567 	} else {
2568 		bdev_io->internal.error.nvme.sct = sct;
2569 		bdev_io->internal.error.nvme.sc = sc;
2570 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NVME_ERROR;
2571 	}
2572 
2573 	spdk_bdev_io_complete(bdev_io, bdev_io->internal.status);
2574 }
2575 
2576 void
2577 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc)
2578 {
2579 	assert(sct != NULL);
2580 	assert(sc != NULL);
2581 
2582 	if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) {
2583 		*sct = bdev_io->internal.error.nvme.sct;
2584 		*sc = bdev_io->internal.error.nvme.sc;
2585 	} else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) {
2586 		*sct = SPDK_NVME_SCT_GENERIC;
2587 		*sc = SPDK_NVME_SC_SUCCESS;
2588 	} else {
2589 		*sct = SPDK_NVME_SCT_GENERIC;
2590 		*sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
2591 	}
2592 }
2593 
2594 struct spdk_thread *
2595 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io)
2596 {
2597 	return spdk_io_channel_get_thread(bdev_io->internal.ch->channel);
2598 }
2599 
2600 static void
2601 _spdk_bdev_qos_config_type(struct spdk_bdev *bdev, uint64_t qos_set,
2602 			   enum spdk_bdev_qos_type qos_type)
2603 {
2604 	uint64_t	min_qos_set = 0;
2605 
2606 	switch (qos_type) {
2607 	case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT:
2608 		min_qos_set = SPDK_BDEV_QOS_MIN_IOS_PER_SEC;
2609 		break;
2610 	case SPDK_BDEV_QOS_RW_BYTEPS_RATE_LIMIT:
2611 		min_qos_set = SPDK_BDEV_QOS_MIN_BW_IN_MB_PER_SEC;
2612 		break;
2613 	default:
2614 		SPDK_ERRLOG("Unsupported QoS type.\n");
2615 		return;
2616 	}
2617 
2618 	if (qos_set % min_qos_set) {
2619 		SPDK_ERRLOG("Assigned QoS %" PRIu64 " on bdev %s is not multiple of %lu\n",
2620 			    qos_set, bdev->name, min_qos_set);
2621 		SPDK_ERRLOG("Failed to enable QoS on this bdev %s\n", bdev->name);
2622 		return;
2623 	}
2624 
2625 	if (!bdev->internal.qos) {
2626 		bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos));
2627 		if (!bdev->internal.qos) {
2628 			SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n");
2629 			return;
2630 		}
2631 	}
2632 
2633 	switch (qos_type) {
2634 	case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT:
2635 		bdev->internal.qos->iops_rate_limit = qos_set;
2636 		break;
2637 	case SPDK_BDEV_QOS_RW_BYTEPS_RATE_LIMIT:
2638 		bdev->internal.qos->byte_rate_limit = qos_set * 1024 * 1024;
2639 		break;
2640 	default:
2641 		break;
2642 	}
2643 
2644 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev:%s QoS type:%d set:%lu\n",
2645 		      bdev->name, qos_type, qos_set);
2646 
2647 	return;
2648 }
2649 
2650 static void
2651 _spdk_bdev_qos_config(struct spdk_bdev *bdev)
2652 {
2653 	struct spdk_conf_section	*sp = NULL;
2654 	const char			*val = NULL;
2655 	uint64_t			qos_set = 0;
2656 	int				i = 0, j = 0;
2657 
2658 	sp = spdk_conf_find_section(NULL, "QoS");
2659 	if (!sp) {
2660 		return;
2661 	}
2662 
2663 	while (j < SPDK_BDEV_QOS_NUM_TYPES) {
2664 		i = 0;
2665 		while (true) {
2666 			val = spdk_conf_section_get_nmval(sp, qos_type_str[j], i, 0);
2667 			if (!val) {
2668 				break;
2669 			}
2670 
2671 			if (strcmp(bdev->name, val) != 0) {
2672 				i++;
2673 				continue;
2674 			}
2675 
2676 			val = spdk_conf_section_get_nmval(sp, qos_type_str[j], i, 1);
2677 			if (val) {
2678 				qos_set = strtoull(val, NULL, 10);
2679 				_spdk_bdev_qos_config_type(bdev, qos_set, j);
2680 			}
2681 
2682 			break;
2683 		}
2684 
2685 		j++;
2686 	}
2687 
2688 	return;
2689 }
2690 
2691 static int
2692 spdk_bdev_init(struct spdk_bdev *bdev)
2693 {
2694 	assert(bdev->module != NULL);
2695 
2696 	if (!bdev->name) {
2697 		SPDK_ERRLOG("Bdev name is NULL\n");
2698 		return -EINVAL;
2699 	}
2700 
2701 	if (spdk_bdev_get_by_name(bdev->name)) {
2702 		SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name);
2703 		return -EEXIST;
2704 	}
2705 
2706 	bdev->internal.status = SPDK_BDEV_STATUS_READY;
2707 
2708 	TAILQ_INIT(&bdev->internal.open_descs);
2709 
2710 	TAILQ_INIT(&bdev->aliases);
2711 
2712 	bdev->internal.reset_in_progress = NULL;
2713 
2714 	_spdk_bdev_qos_config(bdev);
2715 
2716 	spdk_io_device_register(__bdev_to_io_dev(bdev),
2717 				spdk_bdev_channel_create, spdk_bdev_channel_destroy,
2718 				sizeof(struct spdk_bdev_channel));
2719 
2720 	pthread_mutex_init(&bdev->internal.mutex, NULL);
2721 	return 0;
2722 }
2723 
2724 static void
2725 spdk_bdev_destroy_cb(void *io_device)
2726 {
2727 	int			rc;
2728 	struct spdk_bdev	*bdev;
2729 	spdk_bdev_unregister_cb	cb_fn;
2730 	void			*cb_arg;
2731 
2732 	bdev = __bdev_from_io_dev(io_device);
2733 	cb_fn = bdev->internal.unregister_cb;
2734 	cb_arg = bdev->internal.unregister_ctx;
2735 
2736 	rc = bdev->fn_table->destruct(bdev->ctxt);
2737 	if (rc < 0) {
2738 		SPDK_ERRLOG("destruct failed\n");
2739 	}
2740 	if (rc <= 0 && cb_fn != NULL) {
2741 		cb_fn(cb_arg, rc);
2742 	}
2743 }
2744 
2745 
2746 static void
2747 spdk_bdev_fini(struct spdk_bdev *bdev)
2748 {
2749 	pthread_mutex_destroy(&bdev->internal.mutex);
2750 
2751 	free(bdev->internal.qos);
2752 
2753 	spdk_io_device_unregister(__bdev_to_io_dev(bdev), spdk_bdev_destroy_cb);
2754 }
2755 
2756 static void
2757 spdk_bdev_start(struct spdk_bdev *bdev)
2758 {
2759 	struct spdk_bdev_module *module;
2760 
2761 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name);
2762 	TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, internal.link);
2763 
2764 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) {
2765 		if (module->examine) {
2766 			module->internal.action_in_progress++;
2767 			module->examine(bdev);
2768 		}
2769 	}
2770 }
2771 
2772 int
2773 spdk_bdev_register(struct spdk_bdev *bdev)
2774 {
2775 	int rc = spdk_bdev_init(bdev);
2776 
2777 	if (rc == 0) {
2778 		spdk_bdev_start(bdev);
2779 	}
2780 
2781 	return rc;
2782 }
2783 
2784 static void
2785 spdk_vbdev_remove_base_bdevs(struct spdk_bdev *vbdev)
2786 {
2787 	struct spdk_bdev **bdevs;
2788 	struct spdk_bdev *base;
2789 	size_t i, j, k;
2790 	bool found;
2791 
2792 	/* Iterate over base bdevs to remove vbdev from them. */
2793 	for (i = 0; i < vbdev->internal.base_bdevs_cnt; i++) {
2794 		found = false;
2795 		base = vbdev->internal.base_bdevs[i];
2796 
2797 		for (j = 0; j < base->vbdevs_cnt; j++) {
2798 			if (base->vbdevs[j] != vbdev) {
2799 				continue;
2800 			}
2801 
2802 			for (k = j; k + 1 < base->vbdevs_cnt; k++) {
2803 				base->vbdevs[k] = base->vbdevs[k + 1];
2804 			}
2805 
2806 			base->vbdevs_cnt--;
2807 			if (base->vbdevs_cnt > 0) {
2808 				bdevs = realloc(base->vbdevs, base->vbdevs_cnt * sizeof(bdevs[0]));
2809 				/* It would be odd if shrinking memory block fail. */
2810 				assert(bdevs);
2811 				base->vbdevs = bdevs;
2812 			} else {
2813 				free(base->vbdevs);
2814 				base->vbdevs = NULL;
2815 			}
2816 
2817 			found = true;
2818 			break;
2819 		}
2820 
2821 		if (!found) {
2822 			SPDK_WARNLOG("Bdev '%s' is not base bdev of '%s'.\n", base->name, vbdev->name);
2823 		}
2824 	}
2825 
2826 	free(vbdev->internal.base_bdevs);
2827 	vbdev->internal.base_bdevs = NULL;
2828 	vbdev->internal.base_bdevs_cnt = 0;
2829 }
2830 
2831 static int
2832 spdk_vbdev_set_base_bdevs(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, size_t cnt)
2833 {
2834 	struct spdk_bdev **vbdevs;
2835 	struct spdk_bdev *base;
2836 	size_t i;
2837 
2838 	/* Adding base bdevs isn't supported (yet?). */
2839 	assert(vbdev->internal.base_bdevs_cnt == 0);
2840 
2841 	vbdev->internal.base_bdevs = malloc(cnt * sizeof(vbdev->internal.base_bdevs[0]));
2842 	if (!vbdev->internal.base_bdevs) {
2843 		SPDK_ERRLOG("%s - realloc() failed\n", vbdev->name);
2844 		return -ENOMEM;
2845 	}
2846 
2847 	memcpy(vbdev->internal.base_bdevs, base_bdevs, cnt * sizeof(vbdev->internal.base_bdevs[0]));
2848 	vbdev->internal.base_bdevs_cnt = cnt;
2849 
2850 	/* Iterate over base bdevs to add this vbdev to them. */
2851 	for (i = 0; i < cnt; i++) {
2852 		base = vbdev->internal.base_bdevs[i];
2853 
2854 		assert(base != NULL);
2855 		assert(base->internal.claim_module != NULL);
2856 
2857 		vbdevs = realloc(base->vbdevs, (base->vbdevs_cnt + 1) * sizeof(vbdevs[0]));
2858 		if (!vbdevs) {
2859 			SPDK_ERRLOG("%s - realloc() failed\n", base->name);
2860 			spdk_vbdev_remove_base_bdevs(vbdev);
2861 			return -ENOMEM;
2862 		}
2863 
2864 		vbdevs[base->vbdevs_cnt] = vbdev;
2865 		base->vbdevs = vbdevs;
2866 		base->vbdevs_cnt++;
2867 	}
2868 
2869 	return 0;
2870 }
2871 
2872 int
2873 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count)
2874 {
2875 	int rc;
2876 
2877 	rc = spdk_bdev_init(vbdev);
2878 	if (rc) {
2879 		return rc;
2880 	}
2881 
2882 	if (base_bdev_count == 0) {
2883 		spdk_bdev_start(vbdev);
2884 		return 0;
2885 	}
2886 
2887 	rc = spdk_vbdev_set_base_bdevs(vbdev, base_bdevs, base_bdev_count);
2888 	if (rc) {
2889 		spdk_bdev_fini(vbdev);
2890 		return rc;
2891 	}
2892 
2893 	spdk_bdev_start(vbdev);
2894 	return 0;
2895 
2896 }
2897 
2898 void
2899 spdk_bdev_destruct_done(struct spdk_bdev *bdev, int bdeverrno)
2900 {
2901 	if (bdev->internal.unregister_cb != NULL) {
2902 		bdev->internal.unregister_cb(bdev->internal.unregister_ctx, bdeverrno);
2903 	}
2904 }
2905 
2906 static void
2907 _remove_notify(void *arg)
2908 {
2909 	struct spdk_bdev_desc *desc = arg;
2910 
2911 	desc->remove_cb(desc->remove_ctx);
2912 }
2913 
2914 void
2915 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg)
2916 {
2917 	struct spdk_bdev_desc	*desc, *tmp;
2918 	bool			do_destruct = true;
2919 	struct spdk_thread	*thread;
2920 
2921 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name);
2922 
2923 	thread = spdk_get_thread();
2924 	if (!thread) {
2925 		/* The user called this from a non-SPDK thread. */
2926 		if (cb_fn != NULL) {
2927 			cb_fn(cb_arg, -ENOTSUP);
2928 		}
2929 		return;
2930 	}
2931 
2932 	pthread_mutex_lock(&bdev->internal.mutex);
2933 
2934 	spdk_vbdev_remove_base_bdevs(bdev);
2935 
2936 	bdev->internal.status = SPDK_BDEV_STATUS_REMOVING;
2937 	bdev->internal.unregister_cb = cb_fn;
2938 	bdev->internal.unregister_ctx = cb_arg;
2939 
2940 	TAILQ_FOREACH_SAFE(desc, &bdev->internal.open_descs, link, tmp) {
2941 		if (desc->remove_cb) {
2942 			do_destruct = false;
2943 			/*
2944 			 * Defer invocation of the remove_cb to a separate message that will
2945 			 *  run later on this thread.  This ensures this context unwinds and
2946 			 *  we don't recursively unregister this bdev again if the remove_cb
2947 			 *  immediately closes its descriptor.
2948 			 */
2949 			if (!desc->remove_scheduled) {
2950 				/* Avoid scheduling removal of the same descriptor multiple times. */
2951 				desc->remove_scheduled = true;
2952 				spdk_thread_send_msg(thread, _remove_notify, desc);
2953 			}
2954 		}
2955 	}
2956 
2957 	if (!do_destruct) {
2958 		pthread_mutex_unlock(&bdev->internal.mutex);
2959 		return;
2960 	}
2961 
2962 	TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link);
2963 	pthread_mutex_unlock(&bdev->internal.mutex);
2964 
2965 	spdk_bdev_fini(bdev);
2966 }
2967 
2968 int
2969 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb,
2970 	       void *remove_ctx, struct spdk_bdev_desc **_desc)
2971 {
2972 	struct spdk_bdev_desc *desc;
2973 
2974 	desc = calloc(1, sizeof(*desc));
2975 	if (desc == NULL) {
2976 		SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n");
2977 		return -ENOMEM;
2978 	}
2979 
2980 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Opening descriptor %p for bdev %s on thread %p\n", desc, bdev->name,
2981 		      spdk_get_thread());
2982 
2983 	pthread_mutex_lock(&bdev->internal.mutex);
2984 
2985 	if (write && bdev->internal.claim_module) {
2986 		SPDK_ERRLOG("Could not open %s - already claimed\n", bdev->name);
2987 		free(desc);
2988 		pthread_mutex_unlock(&bdev->internal.mutex);
2989 		return -EPERM;
2990 	}
2991 
2992 	TAILQ_INSERT_TAIL(&bdev->internal.open_descs, desc, link);
2993 
2994 	desc->bdev = bdev;
2995 	desc->remove_cb = remove_cb;
2996 	desc->remove_ctx = remove_ctx;
2997 	desc->write = write;
2998 	*_desc = desc;
2999 
3000 	pthread_mutex_unlock(&bdev->internal.mutex);
3001 
3002 	return 0;
3003 }
3004 
3005 void
3006 spdk_bdev_close(struct spdk_bdev_desc *desc)
3007 {
3008 	struct spdk_bdev *bdev = desc->bdev;
3009 	bool do_unregister = false;
3010 
3011 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closing descriptor %p for bdev %s on thread %p\n", desc, bdev->name,
3012 		      spdk_get_thread());
3013 
3014 	pthread_mutex_lock(&bdev->internal.mutex);
3015 
3016 	TAILQ_REMOVE(&bdev->internal.open_descs, desc, link);
3017 	free(desc);
3018 
3019 	/* If no more descriptors, kill QoS channel */
3020 	if (bdev->internal.qos && TAILQ_EMPTY(&bdev->internal.open_descs)) {
3021 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closed last descriptor for bdev %s on thread %p. Stopping QoS.\n",
3022 			      bdev->name, spdk_get_thread());
3023 
3024 		if (spdk_bdev_qos_destroy(bdev)) {
3025 			/* There isn't anything we can do to recover here. Just let the
3026 			 * old QoS poller keep running. The QoS handling won't change
3027 			 * cores when the user allocates a new channel, but it won't break. */
3028 			SPDK_ERRLOG("Unable to shut down QoS poller. It will continue running on the current thread.\n");
3029 		}
3030 	}
3031 
3032 	if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->internal.open_descs)) {
3033 		do_unregister = true;
3034 	}
3035 	pthread_mutex_unlock(&bdev->internal.mutex);
3036 
3037 	if (do_unregister == true) {
3038 		spdk_bdev_unregister(bdev, bdev->internal.unregister_cb, bdev->internal.unregister_ctx);
3039 	}
3040 }
3041 
3042 int
3043 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc,
3044 			    struct spdk_bdev_module *module)
3045 {
3046 	if (bdev->internal.claim_module != NULL) {
3047 		SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name,
3048 			    bdev->internal.claim_module->name);
3049 		return -EPERM;
3050 	}
3051 
3052 	if (desc && !desc->write) {
3053 		desc->write = true;
3054 	}
3055 
3056 	bdev->internal.claim_module = module;
3057 	return 0;
3058 }
3059 
3060 void
3061 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev)
3062 {
3063 	assert(bdev->internal.claim_module != NULL);
3064 	bdev->internal.claim_module = NULL;
3065 }
3066 
3067 struct spdk_bdev *
3068 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc)
3069 {
3070 	return desc->bdev;
3071 }
3072 
3073 void
3074 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp)
3075 {
3076 	struct iovec *iovs;
3077 	int iovcnt;
3078 
3079 	if (bdev_io == NULL) {
3080 		return;
3081 	}
3082 
3083 	switch (bdev_io->type) {
3084 	case SPDK_BDEV_IO_TYPE_READ:
3085 		iovs = bdev_io->u.bdev.iovs;
3086 		iovcnt = bdev_io->u.bdev.iovcnt;
3087 		break;
3088 	case SPDK_BDEV_IO_TYPE_WRITE:
3089 		iovs = bdev_io->u.bdev.iovs;
3090 		iovcnt = bdev_io->u.bdev.iovcnt;
3091 		break;
3092 	default:
3093 		iovs = NULL;
3094 		iovcnt = 0;
3095 		break;
3096 	}
3097 
3098 	if (iovp) {
3099 		*iovp = iovs;
3100 	}
3101 	if (iovcntp) {
3102 		*iovcntp = iovcnt;
3103 	}
3104 }
3105 
3106 void
3107 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module)
3108 {
3109 
3110 	if (spdk_bdev_module_list_find(bdev_module->name)) {
3111 		SPDK_ERRLOG("ERROR: module '%s' already registered.\n", bdev_module->name);
3112 		assert(false);
3113 	}
3114 
3115 	if (bdev_module->async_init) {
3116 		bdev_module->internal.action_in_progress = 1;
3117 	}
3118 
3119 	/*
3120 	 * Modules with examine callbacks must be initialized first, so they are
3121 	 *  ready to handle examine callbacks from later modules that will
3122 	 *  register physical bdevs.
3123 	 */
3124 	if (bdev_module->examine != NULL) {
3125 		TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq);
3126 	} else {
3127 		TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq);
3128 	}
3129 }
3130 
3131 struct spdk_bdev_module *
3132 spdk_bdev_module_list_find(const char *name)
3133 {
3134 	struct spdk_bdev_module *bdev_module;
3135 
3136 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) {
3137 		if (strcmp(name, bdev_module->name) == 0) {
3138 			break;
3139 		}
3140 	}
3141 
3142 	return bdev_module;
3143 }
3144 
3145 static void
3146 spdk_bdev_write_zeroes_split(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
3147 {
3148 	uint64_t len;
3149 
3150 	if (!success) {
3151 		bdev_io->internal.cb = bdev_io->u.bdev.stored_user_cb;
3152 		_spdk_bdev_io_complete(bdev_io);
3153 		return;
3154 	}
3155 
3156 	/* no need to perform the error checking from write_zeroes_blocks because this request already passed those checks. */
3157 	len = spdk_min(spdk_bdev_get_block_size(bdev_io->bdev) * bdev_io->u.bdev.split_remaining_num_blocks,
3158 		       ZERO_BUFFER_SIZE);
3159 
3160 	bdev_io->u.bdev.offset_blocks = bdev_io->u.bdev.split_current_offset_blocks;
3161 	bdev_io->u.bdev.iov.iov_len = len;
3162 	bdev_io->u.bdev.num_blocks = len / spdk_bdev_get_block_size(bdev_io->bdev);
3163 	bdev_io->u.bdev.split_remaining_num_blocks -= bdev_io->u.bdev.num_blocks;
3164 	bdev_io->u.bdev.split_current_offset_blocks += bdev_io->u.bdev.num_blocks;
3165 
3166 	/* if this round completes the i/o, change the callback to be the original user callback */
3167 	if (bdev_io->u.bdev.split_remaining_num_blocks == 0) {
3168 		spdk_bdev_io_init(bdev_io, bdev_io->bdev, cb_arg, bdev_io->u.bdev.stored_user_cb);
3169 	} else {
3170 		spdk_bdev_io_init(bdev_io, bdev_io->bdev, cb_arg, spdk_bdev_write_zeroes_split);
3171 	}
3172 	spdk_bdev_io_submit(bdev_io);
3173 }
3174 
3175 struct set_qos_limit_ctx {
3176 	void (*cb_fn)(void *cb_arg, int status);
3177 	void *cb_arg;
3178 	struct spdk_bdev *bdev;
3179 };
3180 
3181 static void
3182 _spdk_bdev_set_qos_limit_done(struct set_qos_limit_ctx *ctx, int status)
3183 {
3184 	pthread_mutex_lock(&ctx->bdev->internal.mutex);
3185 	ctx->bdev->internal.qos_mod_in_progress = false;
3186 	pthread_mutex_unlock(&ctx->bdev->internal.mutex);
3187 
3188 	ctx->cb_fn(ctx->cb_arg, status);
3189 	free(ctx);
3190 }
3191 
3192 static void
3193 _spdk_bdev_disable_qos_done(void *cb_arg)
3194 {
3195 	struct set_qos_limit_ctx *ctx = cb_arg;
3196 	struct spdk_bdev *bdev = ctx->bdev;
3197 	struct spdk_bdev_qos *qos;
3198 
3199 	pthread_mutex_lock(&bdev->internal.mutex);
3200 	qos = bdev->internal.qos;
3201 	bdev->internal.qos = NULL;
3202 	pthread_mutex_unlock(&bdev->internal.mutex);
3203 
3204 	_spdk_bdev_abort_queued_io(&qos->queued, qos->ch);
3205 	spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch));
3206 	spdk_poller_unregister(&qos->poller);
3207 
3208 	free(qos);
3209 
3210 	_spdk_bdev_set_qos_limit_done(ctx, 0);
3211 }
3212 
3213 static void
3214 _spdk_bdev_disable_qos_msg_done(struct spdk_io_channel_iter *i, int status)
3215 {
3216 	void *io_device = spdk_io_channel_iter_get_io_device(i);
3217 	struct spdk_bdev *bdev = __bdev_from_io_dev(io_device);
3218 	struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
3219 	struct spdk_thread *thread;
3220 
3221 	pthread_mutex_lock(&bdev->internal.mutex);
3222 	thread = bdev->internal.qos->thread;
3223 	pthread_mutex_unlock(&bdev->internal.mutex);
3224 
3225 	spdk_thread_send_msg(thread, _spdk_bdev_disable_qos_done, ctx);
3226 }
3227 
3228 static void
3229 _spdk_bdev_disable_qos_msg(struct spdk_io_channel_iter *i)
3230 {
3231 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
3232 	struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch);
3233 
3234 	bdev_ch->flags &= ~BDEV_CH_QOS_ENABLED;
3235 
3236 	spdk_for_each_channel_continue(i, 0);
3237 }
3238 
3239 static void
3240 _spdk_bdev_update_qos_limit_iops_msg(void *cb_arg)
3241 {
3242 	struct set_qos_limit_ctx *ctx = cb_arg;
3243 	struct spdk_bdev *bdev = ctx->bdev;
3244 
3245 	pthread_mutex_lock(&bdev->internal.mutex);
3246 	spdk_bdev_qos_update_max_quota_per_timeslice(bdev->internal.qos);
3247 	pthread_mutex_unlock(&bdev->internal.mutex);
3248 
3249 	_spdk_bdev_set_qos_limit_done(ctx, 0);
3250 }
3251 
3252 static void
3253 _spdk_bdev_enable_qos_msg(struct spdk_io_channel_iter *i)
3254 {
3255 	void *io_device = spdk_io_channel_iter_get_io_device(i);
3256 	struct spdk_bdev *bdev = __bdev_from_io_dev(io_device);
3257 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
3258 	struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch);
3259 	int rc;
3260 
3261 	pthread_mutex_lock(&bdev->internal.mutex);
3262 	rc = _spdk_bdev_enable_qos(bdev, bdev_ch);
3263 	pthread_mutex_unlock(&bdev->internal.mutex);
3264 	spdk_for_each_channel_continue(i, rc);
3265 }
3266 
3267 static void
3268 _spdk_bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status)
3269 {
3270 	struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
3271 
3272 	_spdk_bdev_set_qos_limit_done(ctx, status);
3273 }
3274 
3275 void
3276 spdk_bdev_set_qos_limit_iops(struct spdk_bdev *bdev, uint64_t ios_per_sec,
3277 			     void (*cb_fn)(void *cb_arg, int status), void *cb_arg)
3278 {
3279 	struct set_qos_limit_ctx *ctx;
3280 
3281 	if (ios_per_sec > 0 && ios_per_sec % SPDK_BDEV_QOS_MIN_IOS_PER_SEC) {
3282 		SPDK_ERRLOG("Requested ios_per_sec limit %" PRIu64 " is not a multiple of %u\n",
3283 			    ios_per_sec, SPDK_BDEV_QOS_MIN_IOS_PER_SEC);
3284 		cb_fn(cb_arg, -EINVAL);
3285 		return;
3286 	}
3287 
3288 	ctx = calloc(1, sizeof(*ctx));
3289 	if (ctx == NULL) {
3290 		cb_fn(cb_arg, -ENOMEM);
3291 		return;
3292 	}
3293 
3294 	ctx->cb_fn = cb_fn;
3295 	ctx->cb_arg = cb_arg;
3296 	ctx->bdev = bdev;
3297 
3298 	pthread_mutex_lock(&bdev->internal.mutex);
3299 	if (bdev->internal.qos_mod_in_progress) {
3300 		pthread_mutex_unlock(&bdev->internal.mutex);
3301 		free(ctx);
3302 		cb_fn(cb_arg, -EAGAIN);
3303 		return;
3304 	}
3305 	bdev->internal.qos_mod_in_progress = true;
3306 
3307 	if (ios_per_sec > 0) {
3308 		if (bdev->internal.qos == NULL) {
3309 			/* Enabling */
3310 			bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos));
3311 			if (!bdev->internal.qos) {
3312 				pthread_mutex_unlock(&bdev->internal.mutex);
3313 				SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n");
3314 				free(ctx);
3315 				cb_fn(cb_arg, -ENOMEM);
3316 				return;
3317 			}
3318 
3319 			bdev->internal.qos->iops_rate_limit = ios_per_sec;
3320 			spdk_for_each_channel(__bdev_to_io_dev(bdev),
3321 					      _spdk_bdev_enable_qos_msg, ctx,
3322 					      _spdk_bdev_enable_qos_done);
3323 		} else {
3324 			/* Updating */
3325 			bdev->internal.qos->iops_rate_limit = ios_per_sec;
3326 			spdk_thread_send_msg(bdev->internal.qos->thread, _spdk_bdev_update_qos_limit_iops_msg, ctx);
3327 		}
3328 	} else {
3329 		if (bdev->internal.qos != NULL) {
3330 			/* Disabling */
3331 			spdk_for_each_channel(__bdev_to_io_dev(bdev),
3332 					      _spdk_bdev_disable_qos_msg, ctx,
3333 					      _spdk_bdev_disable_qos_msg_done);
3334 		} else {
3335 			pthread_mutex_unlock(&bdev->internal.mutex);
3336 			_spdk_bdev_set_qos_limit_done(ctx, 0);
3337 			return;
3338 		}
3339 	}
3340 
3341 	pthread_mutex_unlock(&bdev->internal.mutex);
3342 }
3343 
3344 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV)
3345