xref: /spdk/lib/bdev/bdev.c (revision 17e9d38c5b101ec705e80cd015d93df97e267f03)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/bdev.h"
37 #include "spdk/conf.h"
38 
39 #include "spdk/env.h"
40 #include "spdk/event.h"
41 #include "spdk/thread.h"
42 #include "spdk/likely.h"
43 #include "spdk/queue.h"
44 #include "spdk/nvme_spec.h"
45 #include "spdk/scsi_spec.h"
46 #include "spdk/util.h"
47 
48 #include "spdk/bdev_module.h"
49 #include "spdk_internal/log.h"
50 #include "spdk/string.h"
51 
52 #ifdef SPDK_CONFIG_VTUNE
53 #include "ittnotify.h"
54 #include "ittnotify_types.h"
55 int __itt_init_ittlib(const char *, __itt_group_id);
56 #endif
57 
58 #define SPDK_BDEV_IO_POOL_SIZE			(64 * 1024)
59 #define SPDK_BDEV_IO_CACHE_SIZE			256
60 #define BUF_SMALL_POOL_SIZE			8192
61 #define BUF_LARGE_POOL_SIZE			1024
62 #define NOMEM_THRESHOLD_COUNT			8
63 #define ZERO_BUFFER_SIZE			0x100000
64 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC		1000
65 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE	1
66 #define SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE	512
67 #define SPDK_BDEV_QOS_MIN_IOS_PER_SEC		10000
68 #define SPDK_BDEV_QOS_MIN_BW_IN_MB_PER_SEC	10
69 
70 enum spdk_bdev_qos_type {
71 	SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT = 0,
72 	SPDK_BDEV_QOS_RW_BYTEPS_RATE_LIMIT,
73 	SPDK_BDEV_QOS_NUM_TYPES /* Keep last */
74 };
75 
76 static const char *qos_type_str[SPDK_BDEV_QOS_NUM_TYPES] = {"Limit_IOPS", "Limit_BWPS"};
77 
78 TAILQ_HEAD(spdk_bdev_list, spdk_bdev);
79 
80 struct spdk_bdev_mgr {
81 	struct spdk_mempool *bdev_io_pool;
82 
83 	struct spdk_mempool *buf_small_pool;
84 	struct spdk_mempool *buf_large_pool;
85 
86 	void *zero_buffer;
87 
88 	TAILQ_HEAD(, spdk_bdev_module) bdev_modules;
89 
90 	struct spdk_bdev_list bdevs;
91 
92 	bool init_complete;
93 	bool module_init_complete;
94 
95 #ifdef SPDK_CONFIG_VTUNE
96 	__itt_domain	*domain;
97 #endif
98 };
99 
100 static struct spdk_bdev_mgr g_bdev_mgr = {
101 	.bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules),
102 	.bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs),
103 	.init_complete = false,
104 	.module_init_complete = false,
105 };
106 
107 static struct spdk_bdev_opts	g_bdev_opts = {
108 	.bdev_io_pool_size = SPDK_BDEV_IO_POOL_SIZE,
109 	.bdev_io_cache_size = SPDK_BDEV_IO_CACHE_SIZE,
110 };
111 
112 static spdk_bdev_init_cb	g_init_cb_fn = NULL;
113 static void			*g_init_cb_arg = NULL;
114 
115 static spdk_bdev_fini_cb	g_fini_cb_fn = NULL;
116 static void			*g_fini_cb_arg = NULL;
117 static struct spdk_thread	*g_fini_thread = NULL;
118 
119 struct spdk_bdev_qos {
120 	/** Rate limit, in I/O per second */
121 	uint64_t iops_rate_limit;
122 
123 	/** Rate limit, in byte per second */
124 	uint64_t byte_rate_limit;
125 
126 	/** The channel that all I/O are funneled through */
127 	struct spdk_bdev_channel *ch;
128 
129 	/** The thread on which the poller is running. */
130 	struct spdk_thread *thread;
131 
132 	/** Queue of I/O waiting to be issued. */
133 	bdev_io_tailq_t queued;
134 
135 	/** Maximum allowed IOs to be issued in one timeslice (e.g., 1ms) and
136 	 *  only valid for the master channel which manages the outstanding IOs. */
137 	uint64_t max_ios_per_timeslice;
138 
139 	/** Maximum allowed bytes to be issued in one timeslice (e.g., 1ms) and
140 	 *  only valid for the master channel which manages the outstanding IOs. */
141 	uint64_t max_byte_per_timeslice;
142 
143 	/** Remaining IO allowed in current timeslice (e.g., 1ms) */
144 	uint64_t io_remaining_this_timeslice;
145 
146 	/** Remaining bytes allowed in current timeslice (e.g., 1ms).
147 	 *  Allowed to run negative if an I/O is submitted when some bytes are remaining,
148 	 *  but the I/O is bigger than that amount.  The excess will be deducted from the
149 	 *  next timeslice.
150 	 */
151 	int64_t byte_remaining_this_timeslice;
152 
153 	/** Poller that processes queued I/O commands each time slice. */
154 	struct spdk_poller *poller;
155 };
156 
157 struct spdk_bdev_mgmt_channel {
158 	bdev_io_stailq_t need_buf_small;
159 	bdev_io_stailq_t need_buf_large;
160 
161 	/*
162 	 * Each thread keeps a cache of bdev_io - this allows
163 	 *  bdev threads which are *not* DPDK threads to still
164 	 *  benefit from a per-thread bdev_io cache.  Without
165 	 *  this, non-DPDK threads fetching from the mempool
166 	 *  incur a cmpxchg on get and put.
167 	 */
168 	bdev_io_stailq_t per_thread_cache;
169 	uint32_t	per_thread_cache_count;
170 	uint32_t	bdev_io_cache_size;
171 
172 	TAILQ_HEAD(, spdk_bdev_shared_resource)	shared_resources;
173 	TAILQ_HEAD(, spdk_bdev_io_wait_entry)	io_wait_queue;
174 };
175 
176 /*
177  * Per-module (or per-io_device) data. Multiple bdevs built on the same io_device
178  * will queue here their IO that awaits retry. It makes it possible to retry sending
179  * IO to one bdev after IO from other bdev completes.
180  */
181 struct spdk_bdev_shared_resource {
182 	/* The bdev management channel */
183 	struct spdk_bdev_mgmt_channel *mgmt_ch;
184 
185 	/*
186 	 * Count of I/O submitted to bdev module and waiting for completion.
187 	 * Incremented before submit_request() is called on an spdk_bdev_io.
188 	 */
189 	uint64_t		io_outstanding;
190 
191 	/*
192 	 * Queue of IO awaiting retry because of a previous NOMEM status returned
193 	 *  on this channel.
194 	 */
195 	bdev_io_tailq_t		nomem_io;
196 
197 	/*
198 	 * Threshold which io_outstanding must drop to before retrying nomem_io.
199 	 */
200 	uint64_t		nomem_threshold;
201 
202 	/* I/O channel allocated by a bdev module */
203 	struct spdk_io_channel	*shared_ch;
204 
205 	/* Refcount of bdev channels using this resource */
206 	uint32_t		ref;
207 
208 	TAILQ_ENTRY(spdk_bdev_shared_resource) link;
209 };
210 
211 #define BDEV_CH_RESET_IN_PROGRESS	(1 << 0)
212 #define BDEV_CH_QOS_ENABLED		(1 << 1)
213 
214 struct spdk_bdev_channel {
215 	struct spdk_bdev	*bdev;
216 
217 	/* The channel for the underlying device */
218 	struct spdk_io_channel	*channel;
219 
220 	/* Per io_device per thread data */
221 	struct spdk_bdev_shared_resource *shared_resource;
222 
223 	struct spdk_bdev_io_stat stat;
224 
225 	/*
226 	 * Count of I/O submitted through this channel and waiting for completion.
227 	 * Incremented before submit_request() is called on an spdk_bdev_io.
228 	 */
229 	uint64_t		io_outstanding;
230 
231 	bdev_io_tailq_t		queued_resets;
232 
233 	uint32_t		flags;
234 
235 #ifdef SPDK_CONFIG_VTUNE
236 	uint64_t		start_tsc;
237 	uint64_t		interval_tsc;
238 	__itt_string_handle	*handle;
239 	struct spdk_bdev_io_stat prev_stat;
240 #endif
241 
242 };
243 
244 struct spdk_bdev_desc {
245 	struct spdk_bdev		*bdev;
246 	spdk_bdev_remove_cb_t		remove_cb;
247 	void				*remove_ctx;
248 	bool				remove_scheduled;
249 	bool				write;
250 	TAILQ_ENTRY(spdk_bdev_desc)	link;
251 };
252 
253 struct spdk_bdev_iostat_ctx {
254 	struct spdk_bdev_io_stat *stat;
255 	spdk_bdev_get_device_stat_cb cb;
256 	void *cb_arg;
257 };
258 
259 #define __bdev_to_io_dev(bdev)		(((char *)bdev) + 1)
260 #define __bdev_from_io_dev(io_dev)	((struct spdk_bdev *)(((char *)io_dev) - 1))
261 
262 static void _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success,
263 		void *cb_arg);
264 static void _spdk_bdev_write_zero_buffer_next(void *_bdev_io);
265 
266 void
267 spdk_bdev_get_opts(struct spdk_bdev_opts *opts)
268 {
269 	*opts = g_bdev_opts;
270 }
271 
272 int
273 spdk_bdev_set_opts(struct spdk_bdev_opts *opts)
274 {
275 	uint32_t min_pool_size;
276 
277 	/*
278 	 * Add 1 to the thread count to account for the extra mgmt_ch that gets created during subsystem
279 	 *  initialization.  A second mgmt_ch will be created on the same thread when the application starts
280 	 *  but before the deferred put_io_channel event is executed for the first mgmt_ch.
281 	 */
282 	min_pool_size = opts->bdev_io_cache_size * (spdk_thread_get_count() + 1);
283 	if (opts->bdev_io_pool_size < min_pool_size) {
284 		SPDK_ERRLOG("bdev_io_pool_size %" PRIu32 " is not compatible with bdev_io_cache_size %" PRIu32
285 			    " and %" PRIu32 " threads\n", opts->bdev_io_pool_size, opts->bdev_io_cache_size,
286 			    spdk_thread_get_count());
287 		SPDK_ERRLOG("bdev_io_pool_size must be at least %" PRIu32 "\n", min_pool_size);
288 		return -1;
289 	}
290 
291 	g_bdev_opts = *opts;
292 	return 0;
293 }
294 
295 struct spdk_bdev *
296 spdk_bdev_first(void)
297 {
298 	struct spdk_bdev *bdev;
299 
300 	bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs);
301 	if (bdev) {
302 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name);
303 	}
304 
305 	return bdev;
306 }
307 
308 struct spdk_bdev *
309 spdk_bdev_next(struct spdk_bdev *prev)
310 {
311 	struct spdk_bdev *bdev;
312 
313 	bdev = TAILQ_NEXT(prev, internal.link);
314 	if (bdev) {
315 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
316 	}
317 
318 	return bdev;
319 }
320 
321 static struct spdk_bdev *
322 _bdev_next_leaf(struct spdk_bdev *bdev)
323 {
324 	while (bdev != NULL) {
325 		if (bdev->internal.claim_module == NULL) {
326 			return bdev;
327 		} else {
328 			bdev = TAILQ_NEXT(bdev, internal.link);
329 		}
330 	}
331 
332 	return bdev;
333 }
334 
335 struct spdk_bdev *
336 spdk_bdev_first_leaf(void)
337 {
338 	struct spdk_bdev *bdev;
339 
340 	bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs));
341 
342 	if (bdev) {
343 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name);
344 	}
345 
346 	return bdev;
347 }
348 
349 struct spdk_bdev *
350 spdk_bdev_next_leaf(struct spdk_bdev *prev)
351 {
352 	struct spdk_bdev *bdev;
353 
354 	bdev = _bdev_next_leaf(TAILQ_NEXT(prev, internal.link));
355 
356 	if (bdev) {
357 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
358 	}
359 
360 	return bdev;
361 }
362 
363 struct spdk_bdev *
364 spdk_bdev_get_by_name(const char *bdev_name)
365 {
366 	struct spdk_bdev_alias *tmp;
367 	struct spdk_bdev *bdev = spdk_bdev_first();
368 
369 	while (bdev != NULL) {
370 		if (strcmp(bdev_name, bdev->name) == 0) {
371 			return bdev;
372 		}
373 
374 		TAILQ_FOREACH(tmp, &bdev->aliases, tailq) {
375 			if (strcmp(bdev_name, tmp->alias) == 0) {
376 				return bdev;
377 			}
378 		}
379 
380 		bdev = spdk_bdev_next(bdev);
381 	}
382 
383 	return NULL;
384 }
385 
386 void
387 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len)
388 {
389 	struct iovec *iovs;
390 
391 	iovs = bdev_io->u.bdev.iovs;
392 
393 	assert(iovs != NULL);
394 	assert(bdev_io->u.bdev.iovcnt >= 1);
395 
396 	iovs[0].iov_base = buf;
397 	iovs[0].iov_len = len;
398 }
399 
400 static void
401 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io)
402 {
403 	struct spdk_mempool *pool;
404 	struct spdk_bdev_io *tmp;
405 	void *buf, *aligned_buf;
406 	bdev_io_stailq_t *stailq;
407 	struct spdk_bdev_mgmt_channel *ch;
408 
409 	assert(bdev_io->u.bdev.iovcnt == 1);
410 
411 	buf = bdev_io->internal.buf;
412 	ch = bdev_io->internal.ch->shared_resource->mgmt_ch;
413 
414 	bdev_io->internal.buf = NULL;
415 
416 	if (bdev_io->internal.buf_len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
417 		pool = g_bdev_mgr.buf_small_pool;
418 		stailq = &ch->need_buf_small;
419 	} else {
420 		pool = g_bdev_mgr.buf_large_pool;
421 		stailq = &ch->need_buf_large;
422 	}
423 
424 	if (STAILQ_EMPTY(stailq)) {
425 		spdk_mempool_put(pool, buf);
426 	} else {
427 		tmp = STAILQ_FIRST(stailq);
428 
429 		aligned_buf = (void *)(((uintptr_t)buf + 511) & ~511UL);
430 		spdk_bdev_io_set_buf(bdev_io, aligned_buf, tmp->internal.buf_len);
431 
432 		STAILQ_REMOVE_HEAD(stailq, internal.buf_link);
433 		tmp->internal.buf = buf;
434 		tmp->internal.get_buf_cb(tmp->internal.ch->channel, tmp);
435 	}
436 }
437 
438 void
439 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len)
440 {
441 	struct spdk_mempool *pool;
442 	bdev_io_stailq_t *stailq;
443 	void *buf, *aligned_buf;
444 	struct spdk_bdev_mgmt_channel *mgmt_ch;
445 
446 	assert(cb != NULL);
447 	assert(bdev_io->u.bdev.iovs != NULL);
448 
449 	if (spdk_unlikely(bdev_io->u.bdev.iovs[0].iov_base != NULL)) {
450 		/* Buffer already present */
451 		cb(bdev_io->internal.ch->channel, bdev_io);
452 		return;
453 	}
454 
455 	assert(len <= SPDK_BDEV_LARGE_BUF_MAX_SIZE);
456 	mgmt_ch = bdev_io->internal.ch->shared_resource->mgmt_ch;
457 
458 	bdev_io->internal.buf_len = len;
459 	bdev_io->internal.get_buf_cb = cb;
460 	if (len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
461 		pool = g_bdev_mgr.buf_small_pool;
462 		stailq = &mgmt_ch->need_buf_small;
463 	} else {
464 		pool = g_bdev_mgr.buf_large_pool;
465 		stailq = &mgmt_ch->need_buf_large;
466 	}
467 
468 	buf = spdk_mempool_get(pool);
469 
470 	if (!buf) {
471 		STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link);
472 	} else {
473 		aligned_buf = (void *)(((uintptr_t)buf + 511) & ~511UL);
474 		spdk_bdev_io_set_buf(bdev_io, aligned_buf, len);
475 
476 		bdev_io->internal.buf = buf;
477 		bdev_io->internal.get_buf_cb(bdev_io->internal.ch->channel, bdev_io);
478 	}
479 }
480 
481 static int
482 spdk_bdev_module_get_max_ctx_size(void)
483 {
484 	struct spdk_bdev_module *bdev_module;
485 	int max_bdev_module_size = 0;
486 
487 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) {
488 		if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) {
489 			max_bdev_module_size = bdev_module->get_ctx_size();
490 		}
491 	}
492 
493 	return max_bdev_module_size;
494 }
495 
496 void
497 spdk_bdev_config_text(FILE *fp)
498 {
499 	struct spdk_bdev_module *bdev_module;
500 
501 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) {
502 		if (bdev_module->config_text) {
503 			bdev_module->config_text(fp);
504 		}
505 	}
506 }
507 
508 void
509 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w)
510 {
511 	struct spdk_bdev_module *bdev_module;
512 	struct spdk_bdev *bdev;
513 
514 	assert(w != NULL);
515 
516 	spdk_json_write_array_begin(w);
517 
518 	spdk_json_write_object_begin(w);
519 	spdk_json_write_named_string(w, "method", "set_bdev_options");
520 	spdk_json_write_name(w, "params");
521 	spdk_json_write_object_begin(w);
522 	spdk_json_write_named_uint32(w, "bdev_io_pool_size", g_bdev_opts.bdev_io_pool_size);
523 	spdk_json_write_named_uint32(w, "bdev_io_cache_size", g_bdev_opts.bdev_io_cache_size);
524 	spdk_json_write_object_end(w);
525 	spdk_json_write_object_end(w);
526 
527 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) {
528 		if (bdev_module->config_json) {
529 			bdev_module->config_json(w);
530 		}
531 	}
532 
533 	TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, internal.link) {
534 		spdk_bdev_config_json(bdev, w);
535 	}
536 
537 	spdk_json_write_array_end(w);
538 }
539 
540 static int
541 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf)
542 {
543 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
544 	struct spdk_bdev_io *bdev_io;
545 	uint32_t i;
546 
547 	STAILQ_INIT(&ch->need_buf_small);
548 	STAILQ_INIT(&ch->need_buf_large);
549 
550 	STAILQ_INIT(&ch->per_thread_cache);
551 	ch->bdev_io_cache_size = g_bdev_opts.bdev_io_cache_size;
552 
553 	/* Pre-populate bdev_io cache to ensure this thread cannot be starved. */
554 	ch->per_thread_cache_count = 0;
555 	for (i = 0; i < ch->bdev_io_cache_size; i++) {
556 		bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool);
557 		assert(bdev_io != NULL);
558 		ch->per_thread_cache_count++;
559 		STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link);
560 	}
561 
562 	TAILQ_INIT(&ch->shared_resources);
563 	TAILQ_INIT(&ch->io_wait_queue);
564 
565 	return 0;
566 }
567 
568 static void
569 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf)
570 {
571 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
572 	struct spdk_bdev_io *bdev_io;
573 
574 	if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) {
575 		SPDK_ERRLOG("Pending I/O list wasn't empty on mgmt channel free\n");
576 	}
577 
578 	if (!TAILQ_EMPTY(&ch->shared_resources)) {
579 		SPDK_ERRLOG("Module channel list wasn't empty on mgmt channel free\n");
580 	}
581 
582 	while (!STAILQ_EMPTY(&ch->per_thread_cache)) {
583 		bdev_io = STAILQ_FIRST(&ch->per_thread_cache);
584 		STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link);
585 		ch->per_thread_cache_count--;
586 		spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
587 	}
588 
589 	assert(ch->per_thread_cache_count == 0);
590 }
591 
592 static void
593 spdk_bdev_init_complete(int rc)
594 {
595 	spdk_bdev_init_cb cb_fn = g_init_cb_fn;
596 	void *cb_arg = g_init_cb_arg;
597 	struct spdk_bdev_module *m;
598 
599 	g_bdev_mgr.init_complete = true;
600 	g_init_cb_fn = NULL;
601 	g_init_cb_arg = NULL;
602 
603 	/*
604 	 * For modules that need to know when subsystem init is complete,
605 	 * inform them now.
606 	 */
607 	if (rc == 0) {
608 		TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) {
609 			if (m->init_complete) {
610 				m->init_complete();
611 			}
612 		}
613 	}
614 
615 	cb_fn(cb_arg, rc);
616 }
617 
618 static void
619 spdk_bdev_module_action_complete(void)
620 {
621 	struct spdk_bdev_module *m;
622 
623 	/*
624 	 * Don't finish bdev subsystem initialization if
625 	 * module pre-initialization is still in progress, or
626 	 * the subsystem been already initialized.
627 	 */
628 	if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) {
629 		return;
630 	}
631 
632 	/*
633 	 * Check all bdev modules for inits/examinations in progress. If any
634 	 * exist, return immediately since we cannot finish bdev subsystem
635 	 * initialization until all are completed.
636 	 */
637 	TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) {
638 		if (m->internal.action_in_progress > 0) {
639 			return;
640 		}
641 	}
642 
643 	/*
644 	 * Modules already finished initialization - now that all
645 	 * the bdev modules have finished their asynchronous I/O
646 	 * processing, the entire bdev layer can be marked as complete.
647 	 */
648 	spdk_bdev_init_complete(0);
649 }
650 
651 static void
652 spdk_bdev_module_action_done(struct spdk_bdev_module *module)
653 {
654 	assert(module->internal.action_in_progress > 0);
655 	module->internal.action_in_progress--;
656 	spdk_bdev_module_action_complete();
657 }
658 
659 void
660 spdk_bdev_module_init_done(struct spdk_bdev_module *module)
661 {
662 	spdk_bdev_module_action_done(module);
663 }
664 
665 void
666 spdk_bdev_module_examine_done(struct spdk_bdev_module *module)
667 {
668 	spdk_bdev_module_action_done(module);
669 }
670 
671 static int
672 spdk_bdev_modules_init(void)
673 {
674 	struct spdk_bdev_module *module;
675 	int rc = 0;
676 
677 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) {
678 		rc = module->module_init();
679 		if (rc != 0) {
680 			break;
681 		}
682 	}
683 
684 	g_bdev_mgr.module_init_complete = true;
685 	return rc;
686 }
687 
688 
689 static void
690 spdk_bdev_init_failed_complete(void *cb_arg)
691 {
692 	spdk_bdev_init_complete(-1);
693 }
694 
695 static void
696 spdk_bdev_init_failed(void *cb_arg)
697 {
698 	spdk_bdev_finish(spdk_bdev_init_failed_complete, NULL);
699 }
700 
701 void
702 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg)
703 {
704 	struct spdk_conf_section *sp;
705 	struct spdk_bdev_opts bdev_opts;
706 	int32_t bdev_io_pool_size, bdev_io_cache_size;
707 	int cache_size;
708 	int rc = 0;
709 	char mempool_name[32];
710 
711 	assert(cb_fn != NULL);
712 
713 	sp = spdk_conf_find_section(NULL, "Bdev");
714 	if (sp != NULL) {
715 		spdk_bdev_get_opts(&bdev_opts);
716 
717 		bdev_io_pool_size = spdk_conf_section_get_intval(sp, "BdevIoPoolSize");
718 		if (bdev_io_pool_size >= 0) {
719 			bdev_opts.bdev_io_pool_size = bdev_io_pool_size;
720 		}
721 
722 		bdev_io_cache_size = spdk_conf_section_get_intval(sp, "BdevIoCacheSize");
723 		if (bdev_io_cache_size >= 0) {
724 			bdev_opts.bdev_io_cache_size = bdev_io_cache_size;
725 		}
726 
727 		if (spdk_bdev_set_opts(&bdev_opts)) {
728 			spdk_bdev_init_complete(-1);
729 			return;
730 		}
731 
732 		assert(memcmp(&bdev_opts, &g_bdev_opts, sizeof(bdev_opts)) == 0);
733 	}
734 
735 	g_init_cb_fn = cb_fn;
736 	g_init_cb_arg = cb_arg;
737 
738 	snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid());
739 
740 	g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name,
741 				  g_bdev_opts.bdev_io_pool_size,
742 				  sizeof(struct spdk_bdev_io) +
743 				  spdk_bdev_module_get_max_ctx_size(),
744 				  0,
745 				  SPDK_ENV_SOCKET_ID_ANY);
746 
747 	if (g_bdev_mgr.bdev_io_pool == NULL) {
748 		SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n");
749 		spdk_bdev_init_complete(-1);
750 		return;
751 	}
752 
753 	/**
754 	 * Ensure no more than half of the total buffers end up local caches, by
755 	 *   using spdk_thread_get_count() to determine how many local caches we need
756 	 *   to account for.
757 	 */
758 	cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_thread_get_count());
759 	snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid());
760 
761 	g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name,
762 				    BUF_SMALL_POOL_SIZE,
763 				    SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512,
764 				    cache_size,
765 				    SPDK_ENV_SOCKET_ID_ANY);
766 	if (!g_bdev_mgr.buf_small_pool) {
767 		SPDK_ERRLOG("create rbuf small pool failed\n");
768 		spdk_bdev_init_complete(-1);
769 		return;
770 	}
771 
772 	cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_thread_get_count());
773 	snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid());
774 
775 	g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name,
776 				    BUF_LARGE_POOL_SIZE,
777 				    SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512,
778 				    cache_size,
779 				    SPDK_ENV_SOCKET_ID_ANY);
780 	if (!g_bdev_mgr.buf_large_pool) {
781 		SPDK_ERRLOG("create rbuf large pool failed\n");
782 		spdk_bdev_init_complete(-1);
783 		return;
784 	}
785 
786 	g_bdev_mgr.zero_buffer = spdk_dma_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE,
787 				 NULL);
788 	if (!g_bdev_mgr.zero_buffer) {
789 		SPDK_ERRLOG("create bdev zero buffer failed\n");
790 		spdk_bdev_init_complete(-1);
791 		return;
792 	}
793 
794 #ifdef SPDK_CONFIG_VTUNE
795 	g_bdev_mgr.domain = __itt_domain_create("spdk_bdev");
796 #endif
797 
798 	spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create,
799 				spdk_bdev_mgmt_channel_destroy,
800 				sizeof(struct spdk_bdev_mgmt_channel));
801 
802 	rc = spdk_bdev_modules_init();
803 	if (rc != 0) {
804 		SPDK_ERRLOG("bdev modules init failed\n");
805 		spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_init_failed, NULL);
806 		return;
807 	}
808 
809 	spdk_bdev_module_action_complete();
810 }
811 
812 static void
813 spdk_bdev_mgr_unregister_cb(void *io_device)
814 {
815 	spdk_bdev_fini_cb cb_fn = g_fini_cb_fn;
816 
817 	if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != g_bdev_opts.bdev_io_pool_size) {
818 		SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n",
819 			    spdk_mempool_count(g_bdev_mgr.bdev_io_pool),
820 			    g_bdev_opts.bdev_io_pool_size);
821 	}
822 
823 	if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) {
824 		SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n",
825 			    spdk_mempool_count(g_bdev_mgr.buf_small_pool),
826 			    BUF_SMALL_POOL_SIZE);
827 		assert(false);
828 	}
829 
830 	if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) {
831 		SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n",
832 			    spdk_mempool_count(g_bdev_mgr.buf_large_pool),
833 			    BUF_LARGE_POOL_SIZE);
834 		assert(false);
835 	}
836 
837 	spdk_mempool_free(g_bdev_mgr.bdev_io_pool);
838 	spdk_mempool_free(g_bdev_mgr.buf_small_pool);
839 	spdk_mempool_free(g_bdev_mgr.buf_large_pool);
840 	spdk_dma_free(g_bdev_mgr.zero_buffer);
841 
842 	cb_fn(g_fini_cb_arg);
843 	g_fini_cb_fn = NULL;
844 	g_fini_cb_arg = NULL;
845 }
846 
847 static struct spdk_bdev_module *g_resume_bdev_module = NULL;
848 
849 static void
850 spdk_bdev_module_finish_iter(void *arg)
851 {
852 	struct spdk_bdev_module *bdev_module;
853 
854 	/* Start iterating from the last touched module */
855 	if (!g_resume_bdev_module) {
856 		bdev_module = TAILQ_FIRST(&g_bdev_mgr.bdev_modules);
857 	} else {
858 		bdev_module = TAILQ_NEXT(g_resume_bdev_module, internal.tailq);
859 	}
860 
861 	while (bdev_module) {
862 		if (bdev_module->async_fini) {
863 			/* Save our place so we can resume later. We must
864 			 * save the variable here, before calling module_fini()
865 			 * below, because in some cases the module may immediately
866 			 * call spdk_bdev_module_finish_done() and re-enter
867 			 * this function to continue iterating. */
868 			g_resume_bdev_module = bdev_module;
869 		}
870 
871 		if (bdev_module->module_fini) {
872 			bdev_module->module_fini();
873 		}
874 
875 		if (bdev_module->async_fini) {
876 			return;
877 		}
878 
879 		bdev_module = TAILQ_NEXT(bdev_module, internal.tailq);
880 	}
881 
882 	g_resume_bdev_module = NULL;
883 	spdk_io_device_unregister(&g_bdev_mgr, spdk_bdev_mgr_unregister_cb);
884 }
885 
886 void
887 spdk_bdev_module_finish_done(void)
888 {
889 	if (spdk_get_thread() != g_fini_thread) {
890 		spdk_thread_send_msg(g_fini_thread, spdk_bdev_module_finish_iter, NULL);
891 	} else {
892 		spdk_bdev_module_finish_iter(NULL);
893 	}
894 }
895 
896 static void
897 _spdk_bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno)
898 {
899 	struct spdk_bdev *bdev = cb_arg;
900 
901 	if (bdeverrno && bdev) {
902 		SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n",
903 			     bdev->name);
904 
905 		/*
906 		 * Since the call to spdk_bdev_unregister() failed, we have no way to free this
907 		 *  bdev; try to continue by manually removing this bdev from the list and continue
908 		 *  with the next bdev in the list.
909 		 */
910 		TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link);
911 	}
912 
913 	if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) {
914 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n");
915 		/*
916 		 * Bdev module finish need to be deffered as we might be in the middle of some context
917 		 * (like bdev part free) that will use this bdev (or private bdev driver ctx data)
918 		 * after returning.
919 		 */
920 		spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_module_finish_iter, NULL);
921 		return;
922 	}
923 
924 	/*
925 	 * Unregister the last bdev in the list.  The last bdev in the list should be a bdev
926 	 * that has no bdevs that depend on it.
927 	 */
928 	bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list);
929 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name);
930 	spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev);
931 }
932 
933 void
934 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg)
935 {
936 	struct spdk_bdev_module *m;
937 
938 	assert(cb_fn != NULL);
939 
940 	g_fini_thread = spdk_get_thread();
941 
942 	g_fini_cb_fn = cb_fn;
943 	g_fini_cb_arg = cb_arg;
944 
945 	TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) {
946 		if (m->fini_start) {
947 			m->fini_start();
948 		}
949 	}
950 
951 	_spdk_bdev_finish_unregister_bdevs_iter(NULL, 0);
952 }
953 
954 static struct spdk_bdev_io *
955 spdk_bdev_get_io(struct spdk_bdev_channel *channel)
956 {
957 	struct spdk_bdev_mgmt_channel *ch = channel->shared_resource->mgmt_ch;
958 	struct spdk_bdev_io *bdev_io;
959 
960 	if (ch->per_thread_cache_count > 0) {
961 		bdev_io = STAILQ_FIRST(&ch->per_thread_cache);
962 		STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link);
963 		ch->per_thread_cache_count--;
964 	} else if (spdk_unlikely(!TAILQ_EMPTY(&ch->io_wait_queue))) {
965 		/*
966 		 * Don't try to look for bdev_ios in the global pool if there are
967 		 * waiters on bdev_ios - we don't want this caller to jump the line.
968 		 */
969 		bdev_io = NULL;
970 	} else {
971 		bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool);
972 	}
973 
974 	return bdev_io;
975 }
976 
977 void
978 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io)
979 {
980 	struct spdk_bdev_mgmt_channel *ch = bdev_io->internal.ch->shared_resource->mgmt_ch;
981 
982 	assert(bdev_io != NULL);
983 	assert(bdev_io->internal.status != SPDK_BDEV_IO_STATUS_PENDING);
984 
985 	if (bdev_io->internal.buf != NULL) {
986 		spdk_bdev_io_put_buf(bdev_io);
987 	}
988 
989 	if (ch->per_thread_cache_count < ch->bdev_io_cache_size) {
990 		ch->per_thread_cache_count++;
991 		STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link);
992 		while (ch->per_thread_cache_count > 0 && !TAILQ_EMPTY(&ch->io_wait_queue)) {
993 			struct spdk_bdev_io_wait_entry *entry;
994 
995 			entry = TAILQ_FIRST(&ch->io_wait_queue);
996 			TAILQ_REMOVE(&ch->io_wait_queue, entry, link);
997 			entry->cb_fn(entry->cb_arg);
998 		}
999 	} else {
1000 		/* We should never have a full cache with entries on the io wait queue. */
1001 		assert(TAILQ_EMPTY(&ch->io_wait_queue));
1002 		spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
1003 	}
1004 }
1005 
1006 static uint64_t
1007 _spdk_bdev_get_io_size_in_byte(struct spdk_bdev_io *bdev_io)
1008 {
1009 	struct spdk_bdev	*bdev = bdev_io->bdev;
1010 
1011 	switch (bdev_io->type) {
1012 	case SPDK_BDEV_IO_TYPE_NVME_ADMIN:
1013 	case SPDK_BDEV_IO_TYPE_NVME_IO:
1014 	case SPDK_BDEV_IO_TYPE_NVME_IO_MD:
1015 		return bdev_io->u.nvme_passthru.nbytes;
1016 	case SPDK_BDEV_IO_TYPE_READ:
1017 	case SPDK_BDEV_IO_TYPE_WRITE:
1018 	case SPDK_BDEV_IO_TYPE_UNMAP:
1019 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
1020 		return bdev_io->u.bdev.num_blocks * bdev->blocklen;
1021 	default:
1022 		return 0;
1023 	}
1024 }
1025 
1026 static void
1027 _spdk_bdev_qos_io_submit(struct spdk_bdev_channel *ch)
1028 {
1029 	struct spdk_bdev_io		*bdev_io = NULL;
1030 	struct spdk_bdev		*bdev = ch->bdev;
1031 	struct spdk_bdev_qos		*qos = bdev->internal.qos;
1032 	struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource;
1033 
1034 	while (!TAILQ_EMPTY(&qos->queued)) {
1035 		if (qos->max_ios_per_timeslice > 0 && qos->io_remaining_this_timeslice == 0) {
1036 			break;
1037 		}
1038 
1039 		if (qos->max_byte_per_timeslice > 0 && qos->byte_remaining_this_timeslice <= 0) {
1040 			break;
1041 		}
1042 
1043 		bdev_io = TAILQ_FIRST(&qos->queued);
1044 		TAILQ_REMOVE(&qos->queued, bdev_io, internal.link);
1045 		qos->io_remaining_this_timeslice--;
1046 		qos->byte_remaining_this_timeslice -= _spdk_bdev_get_io_size_in_byte(bdev_io);
1047 		ch->io_outstanding++;
1048 		shared_resource->io_outstanding++;
1049 		bdev->fn_table->submit_request(ch->channel, bdev_io);
1050 	}
1051 }
1052 
1053 static bool
1054 _spdk_bdev_io_type_can_split(uint8_t type)
1055 {
1056 	assert(type != SPDK_BDEV_IO_TYPE_INVALID);
1057 	assert(type < SPDK_BDEV_NUM_IO_TYPES);
1058 
1059 	/* Only split READ and WRITE I/O.  Theoretically other types of I/O like
1060 	 * UNMAP could be split, but these types of I/O are typically much larger
1061 	 * in size (sometimes the size of the entire block device), and the bdev
1062 	 * module can more efficiently split these types of I/O.  Plus those types
1063 	 * of I/O do not have a payload, which makes the splitting process simpler.
1064 	 */
1065 	if (type == SPDK_BDEV_IO_TYPE_READ || type == SPDK_BDEV_IO_TYPE_WRITE) {
1066 		return true;
1067 	} else {
1068 		return false;
1069 	}
1070 }
1071 
1072 static bool
1073 _spdk_bdev_io_should_split(struct spdk_bdev_io *bdev_io)
1074 {
1075 	uint64_t start_stripe, end_stripe;
1076 	uint32_t io_boundary = bdev_io->bdev->optimal_io_boundary;
1077 
1078 	if (io_boundary == 0) {
1079 		return false;
1080 	}
1081 
1082 	if (!_spdk_bdev_io_type_can_split(bdev_io->type)) {
1083 		return false;
1084 	}
1085 
1086 	start_stripe = bdev_io->u.bdev.offset_blocks;
1087 	end_stripe = start_stripe + bdev_io->u.bdev.num_blocks - 1;
1088 	/* Avoid expensive div operations if possible.  These spdk_u32 functions are very cheap. */
1089 	if (spdk_likely(spdk_u32_is_pow2(io_boundary))) {
1090 		start_stripe >>= spdk_u32log2(io_boundary);
1091 		end_stripe >>= spdk_u32log2(io_boundary);
1092 	} else {
1093 		start_stripe /= io_boundary;
1094 		end_stripe /= io_boundary;
1095 	}
1096 	return (start_stripe != end_stripe);
1097 }
1098 
1099 static uint32_t
1100 _to_next_boundary(uint64_t offset, uint32_t boundary)
1101 {
1102 	return (boundary - (offset % boundary));
1103 }
1104 
1105 static void
1106 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg);
1107 
1108 static void
1109 _spdk_bdev_io_split_with_payload(void *_bdev_io)
1110 {
1111 	struct spdk_bdev_io *bdev_io = _bdev_io;
1112 	uint64_t current_offset, remaining, bytes_handled;
1113 	uint32_t blocklen, to_next_boundary, to_next_boundary_bytes;
1114 	struct iovec *parent_iov;
1115 	uint64_t parent_iov_offset, child_iov_len;
1116 	uint32_t child_iovcnt;
1117 	int rc;
1118 
1119 	remaining = bdev_io->u.bdev.split_remaining_num_blocks;
1120 	current_offset = bdev_io->u.bdev.split_current_offset_blocks;
1121 	blocklen = bdev_io->bdev->blocklen;
1122 	bytes_handled = (current_offset - bdev_io->u.bdev.offset_blocks) * blocklen;
1123 	parent_iov = &bdev_io->u.bdev.iovs[0];
1124 	parent_iov_offset = 0;
1125 
1126 	while (bytes_handled > 0) {
1127 		if (bytes_handled >= parent_iov->iov_len) {
1128 			bytes_handled -= parent_iov->iov_len;
1129 			parent_iov++;
1130 			continue;
1131 		}
1132 		parent_iov_offset += bytes_handled;
1133 		break;
1134 	}
1135 
1136 	to_next_boundary = _to_next_boundary(current_offset, bdev_io->bdev->optimal_io_boundary);
1137 	to_next_boundary = spdk_min(remaining, to_next_boundary);
1138 	to_next_boundary_bytes = to_next_boundary * blocklen;
1139 	child_iovcnt = 0;
1140 	while (to_next_boundary_bytes > 0) {
1141 		child_iov_len = spdk_min(to_next_boundary_bytes, parent_iov->iov_len - parent_iov_offset);
1142 		to_next_boundary_bytes -= child_iov_len;
1143 
1144 		bdev_io->child_iov[child_iovcnt].iov_base = parent_iov->iov_base + parent_iov_offset;
1145 		bdev_io->child_iov[child_iovcnt].iov_len = child_iov_len;
1146 
1147 		parent_iov++;
1148 		parent_iov_offset = 0;
1149 		child_iovcnt++;
1150 		if (child_iovcnt == BDEV_IO_NUM_CHILD_IOV && to_next_boundary_bytes > 0) {
1151 			/* We've run out of child iovs - we need to fail this I/O. */
1152 			bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED;
1153 			bdev_io->internal.cb(bdev_io, SPDK_BDEV_IO_STATUS_FAILED,
1154 					     bdev_io->internal.caller_ctx);
1155 			return;
1156 		}
1157 	}
1158 
1159 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
1160 		rc = spdk_bdev_readv_blocks(bdev_io->internal.desc,
1161 					    spdk_io_channel_from_ctx(bdev_io->internal.ch),
1162 					    bdev_io->child_iov, child_iovcnt, current_offset, to_next_boundary,
1163 					    _spdk_bdev_io_split_done, bdev_io);
1164 	} else {
1165 		rc = spdk_bdev_writev_blocks(bdev_io->internal.desc,
1166 					     spdk_io_channel_from_ctx(bdev_io->internal.ch),
1167 					     bdev_io->child_iov, child_iovcnt, current_offset, to_next_boundary,
1168 					     _spdk_bdev_io_split_done, bdev_io);
1169 	}
1170 
1171 	if (rc == 0) {
1172 		bdev_io->u.bdev.split_current_offset_blocks += to_next_boundary;
1173 		bdev_io->u.bdev.split_remaining_num_blocks -= to_next_boundary;
1174 	} else {
1175 		assert(rc == -ENOMEM);
1176 		bdev_io->internal.waitq_entry.bdev = bdev_io->bdev;
1177 		bdev_io->internal.waitq_entry.cb_fn = _spdk_bdev_io_split_with_payload;
1178 		bdev_io->internal.waitq_entry.cb_arg = bdev_io;
1179 		spdk_bdev_queue_io_wait(bdev_io->bdev, spdk_io_channel_from_ctx(bdev_io->internal.ch),
1180 					&bdev_io->internal.waitq_entry);
1181 	}
1182 }
1183 
1184 static void
1185 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1186 {
1187 	struct spdk_bdev_io *parent_io = cb_arg;
1188 
1189 	spdk_bdev_free_io(bdev_io);
1190 
1191 	if (!success) {
1192 		parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED;
1193 		parent_io->internal.cb(parent_io, SPDK_BDEV_IO_STATUS_FAILED, parent_io->internal.caller_ctx);
1194 		return;
1195 	}
1196 
1197 	if (parent_io->u.bdev.split_remaining_num_blocks == 0) {
1198 		parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS;
1199 		parent_io->internal.cb(parent_io, SPDK_BDEV_IO_STATUS_SUCCESS, parent_io->internal.caller_ctx);
1200 		return;
1201 	}
1202 
1203 	/*
1204 	 * Continue with the splitting process.  This function will complete the parent I/O if the
1205 	 * splitting is done.
1206 	 */
1207 	_spdk_bdev_io_split_with_payload(parent_io);
1208 }
1209 
1210 static void
1211 _spdk_bdev_io_split(struct spdk_bdev_io *bdev_io)
1212 {
1213 	assert(_spdk_bdev_io_type_can_split(bdev_io->type));
1214 
1215 	bdev_io->u.bdev.split_current_offset_blocks = bdev_io->u.bdev.offset_blocks;
1216 	bdev_io->u.bdev.split_remaining_num_blocks = bdev_io->u.bdev.num_blocks;
1217 
1218 	_spdk_bdev_io_split_with_payload(bdev_io);
1219 }
1220 
1221 static void
1222 _spdk_bdev_io_submit(void *ctx)
1223 {
1224 	struct spdk_bdev_io *bdev_io = ctx;
1225 	struct spdk_bdev *bdev = bdev_io->bdev;
1226 	struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch;
1227 	struct spdk_io_channel *ch = bdev_ch->channel;
1228 	struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource;
1229 
1230 	bdev_io->internal.submit_tsc = spdk_get_ticks();
1231 	bdev_ch->io_outstanding++;
1232 	shared_resource->io_outstanding++;
1233 	bdev_io->internal.in_submit_request = true;
1234 	if (spdk_likely(bdev_ch->flags == 0)) {
1235 		if (spdk_likely(TAILQ_EMPTY(&shared_resource->nomem_io))) {
1236 			bdev->fn_table->submit_request(ch, bdev_io);
1237 		} else {
1238 			bdev_ch->io_outstanding--;
1239 			shared_resource->io_outstanding--;
1240 			TAILQ_INSERT_TAIL(&shared_resource->nomem_io, bdev_io, internal.link);
1241 		}
1242 	} else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) {
1243 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1244 	} else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) {
1245 		bdev_ch->io_outstanding--;
1246 		shared_resource->io_outstanding--;
1247 		TAILQ_INSERT_TAIL(&bdev->internal.qos->queued, bdev_io, internal.link);
1248 		_spdk_bdev_qos_io_submit(bdev_ch);
1249 	} else {
1250 		SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags);
1251 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1252 	}
1253 	bdev_io->internal.in_submit_request = false;
1254 }
1255 
1256 static void
1257 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io)
1258 {
1259 	struct spdk_bdev *bdev = bdev_io->bdev;
1260 	struct spdk_thread *thread = spdk_io_channel_get_thread(bdev_io->internal.ch->channel);
1261 
1262 	assert(thread != NULL);
1263 	assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING);
1264 
1265 	if (bdev->split_on_optimal_io_boundary && _spdk_bdev_io_should_split(bdev_io)) {
1266 		_spdk_bdev_io_split(bdev_io);
1267 		return;
1268 	}
1269 
1270 	if (bdev_io->internal.ch->flags & BDEV_CH_QOS_ENABLED) {
1271 		if ((thread == bdev->internal.qos->thread) || !bdev->internal.qos->thread) {
1272 			_spdk_bdev_io_submit(bdev_io);
1273 		} else {
1274 			bdev_io->internal.io_submit_ch = bdev_io->internal.ch;
1275 			bdev_io->internal.ch = bdev->internal.qos->ch;
1276 			spdk_thread_send_msg(bdev->internal.qos->thread, _spdk_bdev_io_submit, bdev_io);
1277 		}
1278 	} else {
1279 		_spdk_bdev_io_submit(bdev_io);
1280 	}
1281 }
1282 
1283 static void
1284 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io)
1285 {
1286 	struct spdk_bdev *bdev = bdev_io->bdev;
1287 	struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch;
1288 	struct spdk_io_channel *ch = bdev_ch->channel;
1289 
1290 	assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING);
1291 
1292 	bdev_io->internal.in_submit_request = true;
1293 	bdev->fn_table->submit_request(ch, bdev_io);
1294 	bdev_io->internal.in_submit_request = false;
1295 }
1296 
1297 static void
1298 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io,
1299 		  struct spdk_bdev *bdev, void *cb_arg,
1300 		  spdk_bdev_io_completion_cb cb)
1301 {
1302 	bdev_io->bdev = bdev;
1303 	bdev_io->internal.caller_ctx = cb_arg;
1304 	bdev_io->internal.cb = cb;
1305 	bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING;
1306 	bdev_io->internal.in_submit_request = false;
1307 	bdev_io->internal.buf = NULL;
1308 	bdev_io->internal.io_submit_ch = NULL;
1309 }
1310 
1311 static bool
1312 _spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type)
1313 {
1314 	return bdev->fn_table->io_type_supported(bdev->ctxt, io_type);
1315 }
1316 
1317 bool
1318 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type)
1319 {
1320 	bool supported;
1321 
1322 	supported = _spdk_bdev_io_type_supported(bdev, io_type);
1323 
1324 	if (!supported) {
1325 		switch (io_type) {
1326 		case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
1327 			/* The bdev layer will emulate write zeroes as long as write is supported. */
1328 			supported = _spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE);
1329 			break;
1330 		default:
1331 			break;
1332 		}
1333 	}
1334 
1335 	return supported;
1336 }
1337 
1338 int
1339 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
1340 {
1341 	if (bdev->fn_table->dump_info_json) {
1342 		return bdev->fn_table->dump_info_json(bdev->ctxt, w);
1343 	}
1344 
1345 	return 0;
1346 }
1347 
1348 void
1349 spdk_bdev_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
1350 {
1351 	assert(bdev != NULL);
1352 	assert(w != NULL);
1353 
1354 	if (bdev->fn_table->write_config_json) {
1355 		bdev->fn_table->write_config_json(bdev, w);
1356 	} else {
1357 		spdk_json_write_object_begin(w);
1358 		spdk_json_write_named_string(w, "name", bdev->name);
1359 		spdk_json_write_object_end(w);
1360 	}
1361 }
1362 
1363 static void
1364 spdk_bdev_qos_update_max_quota_per_timeslice(struct spdk_bdev_qos *qos)
1365 {
1366 	uint64_t max_ios_per_timeslice = 0, max_byte_per_timeslice = 0;
1367 
1368 	if (qos->iops_rate_limit > 0) {
1369 		max_ios_per_timeslice = qos->iops_rate_limit * SPDK_BDEV_QOS_TIMESLICE_IN_USEC /
1370 					SPDK_SEC_TO_USEC;
1371 		qos->max_ios_per_timeslice = spdk_max(max_ios_per_timeslice,
1372 						      SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE);
1373 	}
1374 
1375 	if (qos->byte_rate_limit > 0) {
1376 		max_byte_per_timeslice = qos->byte_rate_limit * SPDK_BDEV_QOS_TIMESLICE_IN_USEC /
1377 					 SPDK_SEC_TO_USEC;
1378 		qos->max_byte_per_timeslice = spdk_max(max_byte_per_timeslice,
1379 						       SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE);
1380 	}
1381 }
1382 
1383 static int
1384 spdk_bdev_channel_poll_qos(void *arg)
1385 {
1386 	struct spdk_bdev_qos *qos = arg;
1387 
1388 	/* Reset for next round of rate limiting */
1389 	qos->io_remaining_this_timeslice = qos->max_ios_per_timeslice;
1390 
1391 	/* We may have allowed the bytes to slightly overrun in the last timeslice.
1392 	 * byte_remaining_this_timeslice is signed, so if it's negative here, we'll
1393 	 * account for the overrun so that the next timeslice will be appropriately
1394 	 * reduced.
1395 	 */
1396 	if (qos->byte_remaining_this_timeslice > 0) {
1397 		qos->byte_remaining_this_timeslice = 0;
1398 	}
1399 	qos->byte_remaining_this_timeslice += qos->max_byte_per_timeslice;
1400 
1401 	_spdk_bdev_qos_io_submit(qos->ch);
1402 
1403 	return -1;
1404 }
1405 
1406 static void
1407 _spdk_bdev_channel_destroy_resource(struct spdk_bdev_channel *ch)
1408 {
1409 	struct spdk_bdev_shared_resource *shared_resource;
1410 
1411 	if (!ch) {
1412 		return;
1413 	}
1414 
1415 	if (ch->channel) {
1416 		spdk_put_io_channel(ch->channel);
1417 	}
1418 
1419 	assert(ch->io_outstanding == 0);
1420 
1421 	shared_resource = ch->shared_resource;
1422 	if (shared_resource) {
1423 		assert(ch->io_outstanding == 0);
1424 		assert(shared_resource->ref > 0);
1425 		shared_resource->ref--;
1426 		if (shared_resource->ref == 0) {
1427 			assert(shared_resource->io_outstanding == 0);
1428 			TAILQ_REMOVE(&shared_resource->mgmt_ch->shared_resources, shared_resource, link);
1429 			spdk_put_io_channel(spdk_io_channel_from_ctx(shared_resource->mgmt_ch));
1430 			free(shared_resource);
1431 		}
1432 	}
1433 }
1434 
1435 /* Caller must hold bdev->internal.mutex. */
1436 static void
1437 _spdk_bdev_enable_qos(struct spdk_bdev *bdev, struct spdk_bdev_channel *ch)
1438 {
1439 	struct spdk_bdev_qos *qos = bdev->internal.qos;
1440 
1441 	/* Rate limiting on this bdev enabled */
1442 	if (qos) {
1443 		if (qos->ch == NULL) {
1444 			struct spdk_io_channel *io_ch;
1445 
1446 			SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Selecting channel %p as QoS channel for bdev %s on thread %p\n", ch,
1447 				      bdev->name, spdk_get_thread());
1448 
1449 			/* No qos channel has been selected, so set one up */
1450 
1451 			/* Take another reference to ch */
1452 			io_ch = spdk_get_io_channel(__bdev_to_io_dev(bdev));
1453 			qos->ch = ch;
1454 
1455 			qos->thread = spdk_io_channel_get_thread(io_ch);
1456 
1457 			TAILQ_INIT(&qos->queued);
1458 			spdk_bdev_qos_update_max_quota_per_timeslice(qos);
1459 			qos->io_remaining_this_timeslice = qos->max_ios_per_timeslice;
1460 			qos->byte_remaining_this_timeslice = qos->max_byte_per_timeslice;
1461 
1462 			qos->poller = spdk_poller_register(spdk_bdev_channel_poll_qos,
1463 							   qos,
1464 							   SPDK_BDEV_QOS_TIMESLICE_IN_USEC);
1465 		}
1466 
1467 		ch->flags |= BDEV_CH_QOS_ENABLED;
1468 	}
1469 }
1470 
1471 static int
1472 spdk_bdev_channel_create(void *io_device, void *ctx_buf)
1473 {
1474 	struct spdk_bdev		*bdev = __bdev_from_io_dev(io_device);
1475 	struct spdk_bdev_channel	*ch = ctx_buf;
1476 	struct spdk_io_channel		*mgmt_io_ch;
1477 	struct spdk_bdev_mgmt_channel	*mgmt_ch;
1478 	struct spdk_bdev_shared_resource *shared_resource;
1479 
1480 	ch->bdev = bdev;
1481 	ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt);
1482 	if (!ch->channel) {
1483 		return -1;
1484 	}
1485 
1486 	mgmt_io_ch = spdk_get_io_channel(&g_bdev_mgr);
1487 	if (!mgmt_io_ch) {
1488 		return -1;
1489 	}
1490 
1491 	mgmt_ch = spdk_io_channel_get_ctx(mgmt_io_ch);
1492 	TAILQ_FOREACH(shared_resource, &mgmt_ch->shared_resources, link) {
1493 		if (shared_resource->shared_ch == ch->channel) {
1494 			spdk_put_io_channel(mgmt_io_ch);
1495 			shared_resource->ref++;
1496 			break;
1497 		}
1498 	}
1499 
1500 	if (shared_resource == NULL) {
1501 		shared_resource = calloc(1, sizeof(*shared_resource));
1502 		if (shared_resource == NULL) {
1503 			spdk_put_io_channel(mgmt_io_ch);
1504 			return -1;
1505 		}
1506 
1507 		shared_resource->mgmt_ch = mgmt_ch;
1508 		shared_resource->io_outstanding = 0;
1509 		TAILQ_INIT(&shared_resource->nomem_io);
1510 		shared_resource->nomem_threshold = 0;
1511 		shared_resource->shared_ch = ch->channel;
1512 		shared_resource->ref = 1;
1513 		TAILQ_INSERT_TAIL(&mgmt_ch->shared_resources, shared_resource, link);
1514 	}
1515 
1516 	memset(&ch->stat, 0, sizeof(ch->stat));
1517 	ch->stat.ticks_rate = spdk_get_ticks_hz();
1518 	ch->io_outstanding = 0;
1519 	TAILQ_INIT(&ch->queued_resets);
1520 	ch->flags = 0;
1521 	ch->shared_resource = shared_resource;
1522 
1523 #ifdef SPDK_CONFIG_VTUNE
1524 	{
1525 		char *name;
1526 		__itt_init_ittlib(NULL, 0);
1527 		name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch);
1528 		if (!name) {
1529 			_spdk_bdev_channel_destroy_resource(ch);
1530 			return -1;
1531 		}
1532 		ch->handle = __itt_string_handle_create(name);
1533 		free(name);
1534 		ch->start_tsc = spdk_get_ticks();
1535 		ch->interval_tsc = spdk_get_ticks_hz() / 100;
1536 		memset(&ch->prev_stat, 0, sizeof(ch->prev_stat));
1537 	}
1538 #endif
1539 
1540 	pthread_mutex_lock(&bdev->internal.mutex);
1541 	_spdk_bdev_enable_qos(bdev, ch);
1542 	pthread_mutex_unlock(&bdev->internal.mutex);
1543 
1544 	return 0;
1545 }
1546 
1547 /*
1548  * Abort I/O that are waiting on a data buffer.  These types of I/O are
1549  *  linked using the spdk_bdev_io internal.buf_link TAILQ_ENTRY.
1550  */
1551 static void
1552 _spdk_bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch)
1553 {
1554 	bdev_io_stailq_t tmp;
1555 	struct spdk_bdev_io *bdev_io;
1556 
1557 	STAILQ_INIT(&tmp);
1558 
1559 	while (!STAILQ_EMPTY(queue)) {
1560 		bdev_io = STAILQ_FIRST(queue);
1561 		STAILQ_REMOVE_HEAD(queue, internal.buf_link);
1562 		if (bdev_io->internal.ch == ch) {
1563 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1564 		} else {
1565 			STAILQ_INSERT_TAIL(&tmp, bdev_io, internal.buf_link);
1566 		}
1567 	}
1568 
1569 	STAILQ_SWAP(&tmp, queue, spdk_bdev_io);
1570 }
1571 
1572 /*
1573  * Abort I/O that are queued waiting for submission.  These types of I/O are
1574  *  linked using the spdk_bdev_io link TAILQ_ENTRY.
1575  */
1576 static void
1577 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch)
1578 {
1579 	struct spdk_bdev_io *bdev_io, *tmp;
1580 
1581 	TAILQ_FOREACH_SAFE(bdev_io, queue, internal.link, tmp) {
1582 		if (bdev_io->internal.ch == ch) {
1583 			TAILQ_REMOVE(queue, bdev_io, internal.link);
1584 			/*
1585 			 * spdk_bdev_io_complete() assumes that the completed I/O had
1586 			 *  been submitted to the bdev module.  Since in this case it
1587 			 *  hadn't, bump io_outstanding to account for the decrement
1588 			 *  that spdk_bdev_io_complete() will do.
1589 			 */
1590 			if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) {
1591 				ch->io_outstanding++;
1592 				ch->shared_resource->io_outstanding++;
1593 			}
1594 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1595 		}
1596 	}
1597 }
1598 
1599 static void
1600 spdk_bdev_qos_channel_destroy(void *cb_arg)
1601 {
1602 	struct spdk_bdev_qos *qos = cb_arg;
1603 
1604 	spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch));
1605 	spdk_poller_unregister(&qos->poller);
1606 
1607 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Free QoS %p.\n", qos);
1608 
1609 	free(qos);
1610 }
1611 
1612 static int
1613 spdk_bdev_qos_destroy(struct spdk_bdev *bdev)
1614 {
1615 	/*
1616 	 * Cleanly shutting down the QoS poller is tricky, because
1617 	 * during the asynchronous operation the user could open
1618 	 * a new descriptor and create a new channel, spawning
1619 	 * a new QoS poller.
1620 	 *
1621 	 * The strategy is to create a new QoS structure here and swap it
1622 	 * in. The shutdown path then continues to refer to the old one
1623 	 * until it completes and then releases it.
1624 	 */
1625 	struct spdk_bdev_qos *new_qos, *old_qos;
1626 
1627 	old_qos = bdev->internal.qos;
1628 
1629 	new_qos = calloc(1, sizeof(*new_qos));
1630 	if (!new_qos) {
1631 		SPDK_ERRLOG("Unable to allocate memory to shut down QoS.\n");
1632 		return -ENOMEM;
1633 	}
1634 
1635 	/* Copy the old QoS data into the newly allocated structure */
1636 	memcpy(new_qos, old_qos, sizeof(*new_qos));
1637 
1638 	/* Zero out the key parts of the QoS structure */
1639 	new_qos->ch = NULL;
1640 	new_qos->thread = NULL;
1641 	new_qos->max_ios_per_timeslice = 0;
1642 	new_qos->max_byte_per_timeslice = 0;
1643 	new_qos->io_remaining_this_timeslice = 0;
1644 	new_qos->byte_remaining_this_timeslice = 0;
1645 	new_qos->poller = NULL;
1646 	TAILQ_INIT(&new_qos->queued);
1647 
1648 	bdev->internal.qos = new_qos;
1649 
1650 	if (old_qos->thread == NULL) {
1651 		free(old_qos);
1652 	} else {
1653 		spdk_thread_send_msg(old_qos->thread, spdk_bdev_qos_channel_destroy,
1654 				     old_qos);
1655 	}
1656 
1657 	/* It is safe to continue with destroying the bdev even though the QoS channel hasn't
1658 	 * been destroyed yet. The destruction path will end up waiting for the final
1659 	 * channel to be put before it releases resources. */
1660 
1661 	return 0;
1662 }
1663 
1664 static void
1665 _spdk_bdev_io_stat_add(struct spdk_bdev_io_stat *total, struct spdk_bdev_io_stat *add)
1666 {
1667 	total->bytes_read += add->bytes_read;
1668 	total->num_read_ops += add->num_read_ops;
1669 	total->bytes_written += add->bytes_written;
1670 	total->num_write_ops += add->num_write_ops;
1671 	total->read_latency_ticks += add->read_latency_ticks;
1672 	total->write_latency_ticks += add->write_latency_ticks;
1673 }
1674 
1675 static void
1676 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf)
1677 {
1678 	struct spdk_bdev_channel	*ch = ctx_buf;
1679 	struct spdk_bdev_mgmt_channel	*mgmt_ch;
1680 	struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource;
1681 
1682 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Destroying channel %p for bdev %s on thread %p\n", ch, ch->bdev->name,
1683 		      spdk_get_thread());
1684 
1685 	/* This channel is going away, so add its statistics into the bdev so that they don't get lost. */
1686 	pthread_mutex_lock(&ch->bdev->internal.mutex);
1687 	_spdk_bdev_io_stat_add(&ch->bdev->internal.stat, &ch->stat);
1688 	pthread_mutex_unlock(&ch->bdev->internal.mutex);
1689 
1690 	mgmt_ch = shared_resource->mgmt_ch;
1691 
1692 	_spdk_bdev_abort_queued_io(&ch->queued_resets, ch);
1693 	_spdk_bdev_abort_queued_io(&shared_resource->nomem_io, ch);
1694 	_spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_small, ch);
1695 	_spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_large, ch);
1696 
1697 	_spdk_bdev_channel_destroy_resource(ch);
1698 }
1699 
1700 int
1701 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias)
1702 {
1703 	struct spdk_bdev_alias *tmp;
1704 
1705 	if (alias == NULL) {
1706 		SPDK_ERRLOG("Empty alias passed\n");
1707 		return -EINVAL;
1708 	}
1709 
1710 	if (spdk_bdev_get_by_name(alias)) {
1711 		SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias);
1712 		return -EEXIST;
1713 	}
1714 
1715 	tmp = calloc(1, sizeof(*tmp));
1716 	if (tmp == NULL) {
1717 		SPDK_ERRLOG("Unable to allocate alias\n");
1718 		return -ENOMEM;
1719 	}
1720 
1721 	tmp->alias = strdup(alias);
1722 	if (tmp->alias == NULL) {
1723 		free(tmp);
1724 		SPDK_ERRLOG("Unable to allocate alias\n");
1725 		return -ENOMEM;
1726 	}
1727 
1728 	TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq);
1729 
1730 	return 0;
1731 }
1732 
1733 int
1734 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias)
1735 {
1736 	struct spdk_bdev_alias *tmp;
1737 
1738 	TAILQ_FOREACH(tmp, &bdev->aliases, tailq) {
1739 		if (strcmp(alias, tmp->alias) == 0) {
1740 			TAILQ_REMOVE(&bdev->aliases, tmp, tailq);
1741 			free(tmp->alias);
1742 			free(tmp);
1743 			return 0;
1744 		}
1745 	}
1746 
1747 	SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias);
1748 
1749 	return -ENOENT;
1750 }
1751 
1752 void
1753 spdk_bdev_alias_del_all(struct spdk_bdev *bdev)
1754 {
1755 	struct spdk_bdev_alias *p, *tmp;
1756 
1757 	TAILQ_FOREACH_SAFE(p, &bdev->aliases, tailq, tmp) {
1758 		TAILQ_REMOVE(&bdev->aliases, p, tailq);
1759 		free(p->alias);
1760 		free(p);
1761 	}
1762 }
1763 
1764 struct spdk_io_channel *
1765 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc)
1766 {
1767 	return spdk_get_io_channel(__bdev_to_io_dev(desc->bdev));
1768 }
1769 
1770 const char *
1771 spdk_bdev_get_name(const struct spdk_bdev *bdev)
1772 {
1773 	return bdev->name;
1774 }
1775 
1776 const char *
1777 spdk_bdev_get_product_name(const struct spdk_bdev *bdev)
1778 {
1779 	return bdev->product_name;
1780 }
1781 
1782 const struct spdk_bdev_aliases_list *
1783 spdk_bdev_get_aliases(const struct spdk_bdev *bdev)
1784 {
1785 	return &bdev->aliases;
1786 }
1787 
1788 uint32_t
1789 spdk_bdev_get_block_size(const struct spdk_bdev *bdev)
1790 {
1791 	return bdev->blocklen;
1792 }
1793 
1794 uint64_t
1795 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev)
1796 {
1797 	return bdev->blockcnt;
1798 }
1799 
1800 uint64_t
1801 spdk_bdev_get_qos_ios_per_sec(struct spdk_bdev *bdev)
1802 {
1803 	uint64_t iops_rate_limit = 0;
1804 
1805 	pthread_mutex_lock(&bdev->internal.mutex);
1806 	if (bdev->internal.qos) {
1807 		iops_rate_limit = bdev->internal.qos->iops_rate_limit;
1808 	}
1809 	pthread_mutex_unlock(&bdev->internal.mutex);
1810 
1811 	return iops_rate_limit;
1812 }
1813 
1814 size_t
1815 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev)
1816 {
1817 	/* TODO: push this logic down to the bdev modules */
1818 	if (bdev->need_aligned_buffer) {
1819 		return bdev->blocklen;
1820 	}
1821 
1822 	return 1;
1823 }
1824 
1825 uint32_t
1826 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev)
1827 {
1828 	return bdev->optimal_io_boundary;
1829 }
1830 
1831 bool
1832 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev)
1833 {
1834 	return bdev->write_cache;
1835 }
1836 
1837 const struct spdk_uuid *
1838 spdk_bdev_get_uuid(const struct spdk_bdev *bdev)
1839 {
1840 	return &bdev->uuid;
1841 }
1842 
1843 uint64_t
1844 spdk_bdev_get_qd(const struct spdk_bdev *bdev)
1845 {
1846 	return bdev->internal.measured_queue_depth;
1847 }
1848 
1849 uint64_t
1850 spdk_bdev_get_qd_sampling_period(const struct spdk_bdev *bdev)
1851 {
1852 	return bdev->internal.period;
1853 }
1854 
1855 uint64_t
1856 spdk_bdev_get_weighted_io_time(const struct spdk_bdev *bdev)
1857 {
1858 	return bdev->internal.weighted_io_time;
1859 }
1860 
1861 uint64_t
1862 spdk_bdev_get_io_time(const struct spdk_bdev *bdev)
1863 {
1864 	return bdev->internal.io_time;
1865 }
1866 
1867 static void
1868 _calculate_measured_qd_cpl(struct spdk_io_channel_iter *i, int status)
1869 {
1870 	struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i);
1871 
1872 	bdev->internal.measured_queue_depth = bdev->internal.temporary_queue_depth;
1873 
1874 	if (bdev->internal.measured_queue_depth) {
1875 		bdev->internal.io_time += bdev->internal.period;
1876 		bdev->internal.weighted_io_time += bdev->internal.period * bdev->internal.measured_queue_depth;
1877 	}
1878 }
1879 
1880 static void
1881 _calculate_measured_qd(struct spdk_io_channel_iter *i)
1882 {
1883 	struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i);
1884 	struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i);
1885 	struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(io_ch);
1886 
1887 	bdev->internal.temporary_queue_depth += ch->io_outstanding;
1888 	spdk_for_each_channel_continue(i, 0);
1889 }
1890 
1891 static int
1892 spdk_bdev_calculate_measured_queue_depth(void *ctx)
1893 {
1894 	struct spdk_bdev *bdev = ctx;
1895 	bdev->internal.temporary_queue_depth = 0;
1896 	spdk_for_each_channel(__bdev_to_io_dev(bdev), _calculate_measured_qd, bdev,
1897 			      _calculate_measured_qd_cpl);
1898 	return 0;
1899 }
1900 
1901 void
1902 spdk_bdev_set_qd_sampling_period(struct spdk_bdev *bdev, uint64_t period)
1903 {
1904 	bdev->internal.period = period;
1905 
1906 	if (bdev->internal.qd_poller != NULL) {
1907 		spdk_poller_unregister(&bdev->internal.qd_poller);
1908 		bdev->internal.measured_queue_depth = UINT64_MAX;
1909 	}
1910 
1911 	if (period != 0) {
1912 		bdev->internal.qd_poller = spdk_poller_register(spdk_bdev_calculate_measured_queue_depth, bdev,
1913 					   period);
1914 	}
1915 }
1916 
1917 int
1918 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size)
1919 {
1920 	int ret;
1921 
1922 	pthread_mutex_lock(&bdev->internal.mutex);
1923 
1924 	/* bdev has open descriptors */
1925 	if (!TAILQ_EMPTY(&bdev->internal.open_descs) &&
1926 	    bdev->blockcnt > size) {
1927 		ret = -EBUSY;
1928 	} else {
1929 		bdev->blockcnt = size;
1930 		ret = 0;
1931 	}
1932 
1933 	pthread_mutex_unlock(&bdev->internal.mutex);
1934 
1935 	return ret;
1936 }
1937 
1938 /*
1939  * Convert I/O offset and length from bytes to blocks.
1940  *
1941  * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size.
1942  */
1943 static uint64_t
1944 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks,
1945 			  uint64_t num_bytes, uint64_t *num_blocks)
1946 {
1947 	uint32_t block_size = bdev->blocklen;
1948 
1949 	*offset_blocks = offset_bytes / block_size;
1950 	*num_blocks = num_bytes / block_size;
1951 
1952 	return (offset_bytes % block_size) | (num_bytes % block_size);
1953 }
1954 
1955 static bool
1956 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks)
1957 {
1958 	/* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there
1959 	 * has been an overflow and hence the offset has been wrapped around */
1960 	if (offset_blocks + num_blocks < offset_blocks) {
1961 		return false;
1962 	}
1963 
1964 	/* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */
1965 	if (offset_blocks + num_blocks > bdev->blockcnt) {
1966 		return false;
1967 	}
1968 
1969 	return true;
1970 }
1971 
1972 int
1973 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1974 	       void *buf, uint64_t offset, uint64_t nbytes,
1975 	       spdk_bdev_io_completion_cb cb, void *cb_arg)
1976 {
1977 	uint64_t offset_blocks, num_blocks;
1978 
1979 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1980 		return -EINVAL;
1981 	}
1982 
1983 	return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
1984 }
1985 
1986 int
1987 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1988 		      void *buf, uint64_t offset_blocks, uint64_t num_blocks,
1989 		      spdk_bdev_io_completion_cb cb, void *cb_arg)
1990 {
1991 	struct spdk_bdev *bdev = desc->bdev;
1992 	struct spdk_bdev_io *bdev_io;
1993 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1994 
1995 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1996 		return -EINVAL;
1997 	}
1998 
1999 	bdev_io = spdk_bdev_get_io(channel);
2000 	if (!bdev_io) {
2001 		return -ENOMEM;
2002 	}
2003 
2004 	bdev_io->internal.ch = channel;
2005 	bdev_io->internal.desc = desc;
2006 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
2007 	bdev_io->u.bdev.iovs = &bdev_io->iov;
2008 	bdev_io->u.bdev.iovs[0].iov_base = buf;
2009 	bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen;
2010 	bdev_io->u.bdev.iovcnt = 1;
2011 	bdev_io->u.bdev.num_blocks = num_blocks;
2012 	bdev_io->u.bdev.offset_blocks = offset_blocks;
2013 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2014 
2015 	spdk_bdev_io_submit(bdev_io);
2016 	return 0;
2017 }
2018 
2019 int
2020 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2021 		struct iovec *iov, int iovcnt,
2022 		uint64_t offset, uint64_t nbytes,
2023 		spdk_bdev_io_completion_cb cb, void *cb_arg)
2024 {
2025 	uint64_t offset_blocks, num_blocks;
2026 
2027 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
2028 		return -EINVAL;
2029 	}
2030 
2031 	return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
2032 }
2033 
2034 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2035 			   struct iovec *iov, int iovcnt,
2036 			   uint64_t offset_blocks, uint64_t num_blocks,
2037 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
2038 {
2039 	struct spdk_bdev *bdev = desc->bdev;
2040 	struct spdk_bdev_io *bdev_io;
2041 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2042 
2043 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2044 		return -EINVAL;
2045 	}
2046 
2047 	bdev_io = spdk_bdev_get_io(channel);
2048 	if (!bdev_io) {
2049 		return -ENOMEM;
2050 	}
2051 
2052 	bdev_io->internal.ch = channel;
2053 	bdev_io->internal.desc = desc;
2054 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
2055 	bdev_io->u.bdev.iovs = iov;
2056 	bdev_io->u.bdev.iovcnt = iovcnt;
2057 	bdev_io->u.bdev.num_blocks = num_blocks;
2058 	bdev_io->u.bdev.offset_blocks = offset_blocks;
2059 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2060 
2061 	spdk_bdev_io_submit(bdev_io);
2062 	return 0;
2063 }
2064 
2065 int
2066 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2067 		void *buf, uint64_t offset, uint64_t nbytes,
2068 		spdk_bdev_io_completion_cb cb, void *cb_arg)
2069 {
2070 	uint64_t offset_blocks, num_blocks;
2071 
2072 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
2073 		return -EINVAL;
2074 	}
2075 
2076 	return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
2077 }
2078 
2079 int
2080 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2081 		       void *buf, uint64_t offset_blocks, uint64_t num_blocks,
2082 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
2083 {
2084 	struct spdk_bdev *bdev = desc->bdev;
2085 	struct spdk_bdev_io *bdev_io;
2086 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2087 
2088 	if (!desc->write) {
2089 		return -EBADF;
2090 	}
2091 
2092 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2093 		return -EINVAL;
2094 	}
2095 
2096 	bdev_io = spdk_bdev_get_io(channel);
2097 	if (!bdev_io) {
2098 		return -ENOMEM;
2099 	}
2100 
2101 	bdev_io->internal.ch = channel;
2102 	bdev_io->internal.desc = desc;
2103 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
2104 	bdev_io->u.bdev.iovs = &bdev_io->iov;
2105 	bdev_io->u.bdev.iovs[0].iov_base = buf;
2106 	bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen;
2107 	bdev_io->u.bdev.iovcnt = 1;
2108 	bdev_io->u.bdev.num_blocks = num_blocks;
2109 	bdev_io->u.bdev.offset_blocks = offset_blocks;
2110 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2111 
2112 	spdk_bdev_io_submit(bdev_io);
2113 	return 0;
2114 }
2115 
2116 int
2117 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2118 		 struct iovec *iov, int iovcnt,
2119 		 uint64_t offset, uint64_t len,
2120 		 spdk_bdev_io_completion_cb cb, void *cb_arg)
2121 {
2122 	uint64_t offset_blocks, num_blocks;
2123 
2124 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
2125 		return -EINVAL;
2126 	}
2127 
2128 	return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
2129 }
2130 
2131 int
2132 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2133 			struct iovec *iov, int iovcnt,
2134 			uint64_t offset_blocks, uint64_t num_blocks,
2135 			spdk_bdev_io_completion_cb cb, void *cb_arg)
2136 {
2137 	struct spdk_bdev *bdev = desc->bdev;
2138 	struct spdk_bdev_io *bdev_io;
2139 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2140 
2141 	if (!desc->write) {
2142 		return -EBADF;
2143 	}
2144 
2145 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2146 		return -EINVAL;
2147 	}
2148 
2149 	bdev_io = spdk_bdev_get_io(channel);
2150 	if (!bdev_io) {
2151 		return -ENOMEM;
2152 	}
2153 
2154 	bdev_io->internal.ch = channel;
2155 	bdev_io->internal.desc = desc;
2156 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
2157 	bdev_io->u.bdev.iovs = iov;
2158 	bdev_io->u.bdev.iovcnt = iovcnt;
2159 	bdev_io->u.bdev.num_blocks = num_blocks;
2160 	bdev_io->u.bdev.offset_blocks = offset_blocks;
2161 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2162 
2163 	spdk_bdev_io_submit(bdev_io);
2164 	return 0;
2165 }
2166 
2167 int
2168 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2169 		       uint64_t offset, uint64_t len,
2170 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
2171 {
2172 	uint64_t offset_blocks, num_blocks;
2173 
2174 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
2175 		return -EINVAL;
2176 	}
2177 
2178 	return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
2179 }
2180 
2181 int
2182 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2183 			      uint64_t offset_blocks, uint64_t num_blocks,
2184 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
2185 {
2186 	struct spdk_bdev *bdev = desc->bdev;
2187 	struct spdk_bdev_io *bdev_io;
2188 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2189 
2190 	if (!desc->write) {
2191 		return -EBADF;
2192 	}
2193 
2194 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2195 		return -EINVAL;
2196 	}
2197 
2198 	bdev_io = spdk_bdev_get_io(channel);
2199 
2200 	if (!bdev_io) {
2201 		return -ENOMEM;
2202 	}
2203 
2204 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES;
2205 	bdev_io->internal.ch = channel;
2206 	bdev_io->internal.desc = desc;
2207 	bdev_io->u.bdev.offset_blocks = offset_blocks;
2208 	bdev_io->u.bdev.num_blocks = num_blocks;
2209 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2210 
2211 	if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
2212 		spdk_bdev_io_submit(bdev_io);
2213 		return 0;
2214 	} else if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)) {
2215 		assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE);
2216 		bdev_io->u.bdev.split_remaining_num_blocks = num_blocks;
2217 		bdev_io->u.bdev.split_current_offset_blocks = offset_blocks;
2218 		_spdk_bdev_write_zero_buffer_next(bdev_io);
2219 		return 0;
2220 	} else {
2221 		spdk_bdev_free_io(bdev_io);
2222 		return -ENOTSUP;
2223 	}
2224 }
2225 
2226 int
2227 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2228 		uint64_t offset, uint64_t nbytes,
2229 		spdk_bdev_io_completion_cb cb, void *cb_arg)
2230 {
2231 	uint64_t offset_blocks, num_blocks;
2232 
2233 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
2234 		return -EINVAL;
2235 	}
2236 
2237 	return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
2238 }
2239 
2240 int
2241 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2242 		       uint64_t offset_blocks, uint64_t num_blocks,
2243 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
2244 {
2245 	struct spdk_bdev *bdev = desc->bdev;
2246 	struct spdk_bdev_io *bdev_io;
2247 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2248 
2249 	if (!desc->write) {
2250 		return -EBADF;
2251 	}
2252 
2253 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2254 		return -EINVAL;
2255 	}
2256 
2257 	if (num_blocks == 0) {
2258 		SPDK_ERRLOG("Can't unmap 0 bytes\n");
2259 		return -EINVAL;
2260 	}
2261 
2262 	bdev_io = spdk_bdev_get_io(channel);
2263 	if (!bdev_io) {
2264 		return -ENOMEM;
2265 	}
2266 
2267 	bdev_io->internal.ch = channel;
2268 	bdev_io->internal.desc = desc;
2269 	bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP;
2270 
2271 	bdev_io->u.bdev.iovs = &bdev_io->iov;
2272 	bdev_io->u.bdev.iovs[0].iov_base = NULL;
2273 	bdev_io->u.bdev.iovs[0].iov_len = 0;
2274 	bdev_io->u.bdev.iovcnt = 1;
2275 
2276 	bdev_io->u.bdev.offset_blocks = offset_blocks;
2277 	bdev_io->u.bdev.num_blocks = num_blocks;
2278 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2279 
2280 	spdk_bdev_io_submit(bdev_io);
2281 	return 0;
2282 }
2283 
2284 int
2285 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2286 		uint64_t offset, uint64_t length,
2287 		spdk_bdev_io_completion_cb cb, void *cb_arg)
2288 {
2289 	uint64_t offset_blocks, num_blocks;
2290 
2291 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) {
2292 		return -EINVAL;
2293 	}
2294 
2295 	return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
2296 }
2297 
2298 int
2299 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2300 		       uint64_t offset_blocks, uint64_t num_blocks,
2301 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
2302 {
2303 	struct spdk_bdev *bdev = desc->bdev;
2304 	struct spdk_bdev_io *bdev_io;
2305 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2306 
2307 	if (!desc->write) {
2308 		return -EBADF;
2309 	}
2310 
2311 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2312 		return -EINVAL;
2313 	}
2314 
2315 	bdev_io = spdk_bdev_get_io(channel);
2316 	if (!bdev_io) {
2317 		return -ENOMEM;
2318 	}
2319 
2320 	bdev_io->internal.ch = channel;
2321 	bdev_io->internal.desc = desc;
2322 	bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH;
2323 	bdev_io->u.bdev.iovs = NULL;
2324 	bdev_io->u.bdev.iovcnt = 0;
2325 	bdev_io->u.bdev.offset_blocks = offset_blocks;
2326 	bdev_io->u.bdev.num_blocks = num_blocks;
2327 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2328 
2329 	spdk_bdev_io_submit(bdev_io);
2330 	return 0;
2331 }
2332 
2333 static void
2334 _spdk_bdev_reset_dev(struct spdk_io_channel_iter *i, int status)
2335 {
2336 	struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i);
2337 	struct spdk_bdev_io *bdev_io;
2338 
2339 	bdev_io = TAILQ_FIRST(&ch->queued_resets);
2340 	TAILQ_REMOVE(&ch->queued_resets, bdev_io, internal.link);
2341 	spdk_bdev_io_submit_reset(bdev_io);
2342 }
2343 
2344 static void
2345 _spdk_bdev_reset_freeze_channel(struct spdk_io_channel_iter *i)
2346 {
2347 	struct spdk_io_channel		*ch;
2348 	struct spdk_bdev_channel	*channel;
2349 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
2350 	struct spdk_bdev_shared_resource *shared_resource;
2351 	bdev_io_tailq_t			tmp_queued;
2352 
2353 	TAILQ_INIT(&tmp_queued);
2354 
2355 	ch = spdk_io_channel_iter_get_channel(i);
2356 	channel = spdk_io_channel_get_ctx(ch);
2357 	shared_resource = channel->shared_resource;
2358 	mgmt_channel = shared_resource->mgmt_ch;
2359 
2360 	channel->flags |= BDEV_CH_RESET_IN_PROGRESS;
2361 
2362 	if ((channel->flags & BDEV_CH_QOS_ENABLED) != 0) {
2363 		/* The QoS object is always valid and readable while
2364 		 * the channel flag is set, so the lock here should not
2365 		 * be necessary. We're not in the fast path though, so
2366 		 * just take it anyway. */
2367 		pthread_mutex_lock(&channel->bdev->internal.mutex);
2368 		if (channel->bdev->internal.qos->ch == channel) {
2369 			TAILQ_SWAP(&channel->bdev->internal.qos->queued, &tmp_queued, spdk_bdev_io, internal.link);
2370 		}
2371 		pthread_mutex_unlock(&channel->bdev->internal.mutex);
2372 	}
2373 
2374 	_spdk_bdev_abort_queued_io(&shared_resource->nomem_io, channel);
2375 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel);
2376 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel);
2377 	_spdk_bdev_abort_queued_io(&tmp_queued, channel);
2378 
2379 	spdk_for_each_channel_continue(i, 0);
2380 }
2381 
2382 static void
2383 _spdk_bdev_start_reset(void *ctx)
2384 {
2385 	struct spdk_bdev_channel *ch = ctx;
2386 
2387 	spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), _spdk_bdev_reset_freeze_channel,
2388 			      ch, _spdk_bdev_reset_dev);
2389 }
2390 
2391 static void
2392 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch)
2393 {
2394 	struct spdk_bdev *bdev = ch->bdev;
2395 
2396 	assert(!TAILQ_EMPTY(&ch->queued_resets));
2397 
2398 	pthread_mutex_lock(&bdev->internal.mutex);
2399 	if (bdev->internal.reset_in_progress == NULL) {
2400 		bdev->internal.reset_in_progress = TAILQ_FIRST(&ch->queued_resets);
2401 		/*
2402 		 * Take a channel reference for the target bdev for the life of this
2403 		 *  reset.  This guards against the channel getting destroyed while
2404 		 *  spdk_for_each_channel() calls related to this reset IO are in
2405 		 *  progress.  We will release the reference when this reset is
2406 		 *  completed.
2407 		 */
2408 		bdev->internal.reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev));
2409 		_spdk_bdev_start_reset(ch);
2410 	}
2411 	pthread_mutex_unlock(&bdev->internal.mutex);
2412 }
2413 
2414 int
2415 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2416 		spdk_bdev_io_completion_cb cb, void *cb_arg)
2417 {
2418 	struct spdk_bdev *bdev = desc->bdev;
2419 	struct spdk_bdev_io *bdev_io;
2420 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2421 
2422 	bdev_io = spdk_bdev_get_io(channel);
2423 	if (!bdev_io) {
2424 		return -ENOMEM;
2425 	}
2426 
2427 	bdev_io->internal.ch = channel;
2428 	bdev_io->internal.desc = desc;
2429 	bdev_io->type = SPDK_BDEV_IO_TYPE_RESET;
2430 	bdev_io->u.reset.ch_ref = NULL;
2431 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2432 
2433 	pthread_mutex_lock(&bdev->internal.mutex);
2434 	TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, internal.link);
2435 	pthread_mutex_unlock(&bdev->internal.mutex);
2436 
2437 	_spdk_bdev_channel_start_reset(channel);
2438 
2439 	return 0;
2440 }
2441 
2442 void
2443 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
2444 		      struct spdk_bdev_io_stat *stat)
2445 {
2446 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2447 
2448 	*stat = channel->stat;
2449 }
2450 
2451 static void
2452 _spdk_bdev_get_device_stat_done(struct spdk_io_channel_iter *i, int status)
2453 {
2454 	void *io_device = spdk_io_channel_iter_get_io_device(i);
2455 	struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i);
2456 
2457 	bdev_iostat_ctx->cb(__bdev_from_io_dev(io_device), bdev_iostat_ctx->stat,
2458 			    bdev_iostat_ctx->cb_arg, 0);
2459 	free(bdev_iostat_ctx);
2460 }
2461 
2462 static void
2463 _spdk_bdev_get_each_channel_stat(struct spdk_io_channel_iter *i)
2464 {
2465 	struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i);
2466 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2467 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2468 
2469 	_spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &channel->stat);
2470 	spdk_for_each_channel_continue(i, 0);
2471 }
2472 
2473 void
2474 spdk_bdev_get_device_stat(struct spdk_bdev *bdev, struct spdk_bdev_io_stat *stat,
2475 			  spdk_bdev_get_device_stat_cb cb, void *cb_arg)
2476 {
2477 	struct spdk_bdev_iostat_ctx *bdev_iostat_ctx;
2478 
2479 	assert(bdev != NULL);
2480 	assert(stat != NULL);
2481 	assert(cb != NULL);
2482 
2483 	bdev_iostat_ctx = calloc(1, sizeof(struct spdk_bdev_iostat_ctx));
2484 	if (bdev_iostat_ctx == NULL) {
2485 		SPDK_ERRLOG("Unable to allocate memory for spdk_bdev_iostat_ctx\n");
2486 		cb(bdev, stat, cb_arg, -ENOMEM);
2487 		return;
2488 	}
2489 
2490 	bdev_iostat_ctx->stat = stat;
2491 	bdev_iostat_ctx->cb = cb;
2492 	bdev_iostat_ctx->cb_arg = cb_arg;
2493 
2494 	/* Start with the statistics from previously deleted channels. */
2495 	pthread_mutex_lock(&bdev->internal.mutex);
2496 	_spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &bdev->internal.stat);
2497 	pthread_mutex_unlock(&bdev->internal.mutex);
2498 
2499 	/* Then iterate and add the statistics from each existing channel. */
2500 	spdk_for_each_channel(__bdev_to_io_dev(bdev),
2501 			      _spdk_bdev_get_each_channel_stat,
2502 			      bdev_iostat_ctx,
2503 			      _spdk_bdev_get_device_stat_done);
2504 }
2505 
2506 int
2507 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2508 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
2509 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
2510 {
2511 	struct spdk_bdev *bdev = desc->bdev;
2512 	struct spdk_bdev_io *bdev_io;
2513 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2514 
2515 	if (!desc->write) {
2516 		return -EBADF;
2517 	}
2518 
2519 	bdev_io = spdk_bdev_get_io(channel);
2520 	if (!bdev_io) {
2521 		return -ENOMEM;
2522 	}
2523 
2524 	bdev_io->internal.ch = channel;
2525 	bdev_io->internal.desc = desc;
2526 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN;
2527 	bdev_io->u.nvme_passthru.cmd = *cmd;
2528 	bdev_io->u.nvme_passthru.buf = buf;
2529 	bdev_io->u.nvme_passthru.nbytes = nbytes;
2530 	bdev_io->u.nvme_passthru.md_buf = NULL;
2531 	bdev_io->u.nvme_passthru.md_len = 0;
2532 
2533 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2534 
2535 	spdk_bdev_io_submit(bdev_io);
2536 	return 0;
2537 }
2538 
2539 int
2540 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2541 			   const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
2542 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
2543 {
2544 	struct spdk_bdev *bdev = desc->bdev;
2545 	struct spdk_bdev_io *bdev_io;
2546 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2547 
2548 	if (!desc->write) {
2549 		/*
2550 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
2551 		 *  to easily determine if the command is a read or write, but for now just
2552 		 *  do not allow io_passthru with a read-only descriptor.
2553 		 */
2554 		return -EBADF;
2555 	}
2556 
2557 	bdev_io = spdk_bdev_get_io(channel);
2558 	if (!bdev_io) {
2559 		return -ENOMEM;
2560 	}
2561 
2562 	bdev_io->internal.ch = channel;
2563 	bdev_io->internal.desc = desc;
2564 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO;
2565 	bdev_io->u.nvme_passthru.cmd = *cmd;
2566 	bdev_io->u.nvme_passthru.buf = buf;
2567 	bdev_io->u.nvme_passthru.nbytes = nbytes;
2568 	bdev_io->u.nvme_passthru.md_buf = NULL;
2569 	bdev_io->u.nvme_passthru.md_len = 0;
2570 
2571 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2572 
2573 	spdk_bdev_io_submit(bdev_io);
2574 	return 0;
2575 }
2576 
2577 int
2578 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2579 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len,
2580 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
2581 {
2582 	struct spdk_bdev *bdev = desc->bdev;
2583 	struct spdk_bdev_io *bdev_io;
2584 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2585 
2586 	if (!desc->write) {
2587 		/*
2588 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
2589 		 *  to easily determine if the command is a read or write, but for now just
2590 		 *  do not allow io_passthru with a read-only descriptor.
2591 		 */
2592 		return -EBADF;
2593 	}
2594 
2595 	bdev_io = spdk_bdev_get_io(channel);
2596 	if (!bdev_io) {
2597 		return -ENOMEM;
2598 	}
2599 
2600 	bdev_io->internal.ch = channel;
2601 	bdev_io->internal.desc = desc;
2602 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD;
2603 	bdev_io->u.nvme_passthru.cmd = *cmd;
2604 	bdev_io->u.nvme_passthru.buf = buf;
2605 	bdev_io->u.nvme_passthru.nbytes = nbytes;
2606 	bdev_io->u.nvme_passthru.md_buf = md_buf;
2607 	bdev_io->u.nvme_passthru.md_len = md_len;
2608 
2609 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2610 
2611 	spdk_bdev_io_submit(bdev_io);
2612 	return 0;
2613 }
2614 
2615 int
2616 spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
2617 			struct spdk_bdev_io_wait_entry *entry)
2618 {
2619 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2620 	struct spdk_bdev_mgmt_channel *mgmt_ch = channel->shared_resource->mgmt_ch;
2621 
2622 	if (bdev != entry->bdev) {
2623 		SPDK_ERRLOG("bdevs do not match\n");
2624 		return -EINVAL;
2625 	}
2626 
2627 	if (mgmt_ch->per_thread_cache_count > 0) {
2628 		SPDK_ERRLOG("Cannot queue io_wait if spdk_bdev_io available in per-thread cache\n");
2629 		return -EINVAL;
2630 	}
2631 
2632 	TAILQ_INSERT_TAIL(&mgmt_ch->io_wait_queue, entry, link);
2633 	return 0;
2634 }
2635 
2636 static void
2637 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch)
2638 {
2639 	struct spdk_bdev *bdev = bdev_ch->bdev;
2640 	struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource;
2641 	struct spdk_bdev_io *bdev_io;
2642 
2643 	if (shared_resource->io_outstanding > shared_resource->nomem_threshold) {
2644 		/*
2645 		 * Allow some more I/O to complete before retrying the nomem_io queue.
2646 		 *  Some drivers (such as nvme) cannot immediately take a new I/O in
2647 		 *  the context of a completion, because the resources for the I/O are
2648 		 *  not released until control returns to the bdev poller.  Also, we
2649 		 *  may require several small I/O to complete before a larger I/O
2650 		 *  (that requires splitting) can be submitted.
2651 		 */
2652 		return;
2653 	}
2654 
2655 	while (!TAILQ_EMPTY(&shared_resource->nomem_io)) {
2656 		bdev_io = TAILQ_FIRST(&shared_resource->nomem_io);
2657 		TAILQ_REMOVE(&shared_resource->nomem_io, bdev_io, internal.link);
2658 		bdev_io->internal.ch->io_outstanding++;
2659 		shared_resource->io_outstanding++;
2660 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING;
2661 		bdev->fn_table->submit_request(bdev_io->internal.ch->channel, bdev_io);
2662 		if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) {
2663 			break;
2664 		}
2665 	}
2666 }
2667 
2668 static inline void
2669 _spdk_bdev_io_complete(void *ctx)
2670 {
2671 	struct spdk_bdev_io *bdev_io = ctx;
2672 
2673 	if (spdk_unlikely(bdev_io->internal.in_submit_request || bdev_io->internal.io_submit_ch)) {
2674 		/*
2675 		 * Send the completion to the thread that originally submitted the I/O,
2676 		 * which may not be the current thread in the case of QoS.
2677 		 */
2678 		if (bdev_io->internal.io_submit_ch) {
2679 			bdev_io->internal.ch = bdev_io->internal.io_submit_ch;
2680 			bdev_io->internal.io_submit_ch = NULL;
2681 		}
2682 
2683 		/*
2684 		 * Defer completion to avoid potential infinite recursion if the
2685 		 * user's completion callback issues a new I/O.
2686 		 */
2687 		spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel),
2688 				     _spdk_bdev_io_complete, bdev_io);
2689 		return;
2690 	}
2691 
2692 	if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) {
2693 		switch (bdev_io->type) {
2694 		case SPDK_BDEV_IO_TYPE_READ:
2695 			bdev_io->internal.ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
2696 			bdev_io->internal.ch->stat.num_read_ops++;
2697 			bdev_io->internal.ch->stat.read_latency_ticks += (spdk_get_ticks() - bdev_io->internal.submit_tsc);
2698 			break;
2699 		case SPDK_BDEV_IO_TYPE_WRITE:
2700 			bdev_io->internal.ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
2701 			bdev_io->internal.ch->stat.num_write_ops++;
2702 			bdev_io->internal.ch->stat.write_latency_ticks += (spdk_get_ticks() - bdev_io->internal.submit_tsc);
2703 			break;
2704 		default:
2705 			break;
2706 		}
2707 	}
2708 
2709 #ifdef SPDK_CONFIG_VTUNE
2710 	uint64_t now_tsc = spdk_get_ticks();
2711 	if (now_tsc > (bdev_io->internal.ch->start_tsc + bdev_io->internal.ch->interval_tsc)) {
2712 		uint64_t data[5];
2713 
2714 		data[0] = bdev_io->internal.ch->stat.num_read_ops - bdev_io->internal.ch->prev_stat.num_read_ops;
2715 		data[1] = bdev_io->internal.ch->stat.bytes_read - bdev_io->internal.ch->prev_stat.bytes_read;
2716 		data[2] = bdev_io->internal.ch->stat.num_write_ops - bdev_io->internal.ch->prev_stat.num_write_ops;
2717 		data[3] = bdev_io->internal.ch->stat.bytes_written - bdev_io->internal.ch->prev_stat.bytes_written;
2718 		data[4] = bdev_io->bdev->fn_table->get_spin_time ?
2719 			  bdev_io->bdev->fn_table->get_spin_time(bdev_io->internal.ch->channel) : 0;
2720 
2721 		__itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->internal.ch->handle,
2722 				   __itt_metadata_u64, 5, data);
2723 
2724 		bdev_io->internal.ch->prev_stat = bdev_io->internal.ch->stat;
2725 		bdev_io->internal.ch->start_tsc = now_tsc;
2726 	}
2727 #endif
2728 
2729 	assert(bdev_io->internal.cb != NULL);
2730 	assert(spdk_get_thread() == spdk_io_channel_get_thread(bdev_io->internal.ch->channel));
2731 
2732 	bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS,
2733 			     bdev_io->internal.caller_ctx);
2734 }
2735 
2736 static void
2737 _spdk_bdev_reset_complete(struct spdk_io_channel_iter *i, int status)
2738 {
2739 	struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i);
2740 
2741 	if (bdev_io->u.reset.ch_ref != NULL) {
2742 		spdk_put_io_channel(bdev_io->u.reset.ch_ref);
2743 		bdev_io->u.reset.ch_ref = NULL;
2744 	}
2745 
2746 	_spdk_bdev_io_complete(bdev_io);
2747 }
2748 
2749 static void
2750 _spdk_bdev_unfreeze_channel(struct spdk_io_channel_iter *i)
2751 {
2752 	struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i);
2753 	struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
2754 
2755 	ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS;
2756 	if (!TAILQ_EMPTY(&ch->queued_resets)) {
2757 		_spdk_bdev_channel_start_reset(ch);
2758 	}
2759 
2760 	spdk_for_each_channel_continue(i, 0);
2761 }
2762 
2763 void
2764 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
2765 {
2766 	struct spdk_bdev *bdev = bdev_io->bdev;
2767 	struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch;
2768 	struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource;
2769 
2770 	bdev_io->internal.status = status;
2771 
2772 	if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) {
2773 		bool unlock_channels = false;
2774 
2775 		if (status == SPDK_BDEV_IO_STATUS_NOMEM) {
2776 			SPDK_ERRLOG("NOMEM returned for reset\n");
2777 		}
2778 		pthread_mutex_lock(&bdev->internal.mutex);
2779 		if (bdev_io == bdev->internal.reset_in_progress) {
2780 			bdev->internal.reset_in_progress = NULL;
2781 			unlock_channels = true;
2782 		}
2783 		pthread_mutex_unlock(&bdev->internal.mutex);
2784 
2785 		if (unlock_channels) {
2786 			spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_unfreeze_channel,
2787 					      bdev_io, _spdk_bdev_reset_complete);
2788 			return;
2789 		}
2790 	} else {
2791 		assert(bdev_ch->io_outstanding > 0);
2792 		assert(shared_resource->io_outstanding > 0);
2793 		bdev_ch->io_outstanding--;
2794 		shared_resource->io_outstanding--;
2795 
2796 		if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) {
2797 			TAILQ_INSERT_HEAD(&shared_resource->nomem_io, bdev_io, internal.link);
2798 			/*
2799 			 * Wait for some of the outstanding I/O to complete before we
2800 			 *  retry any of the nomem_io.  Normally we will wait for
2801 			 *  NOMEM_THRESHOLD_COUNT I/O to complete but for low queue
2802 			 *  depth channels we will instead wait for half to complete.
2803 			 */
2804 			shared_resource->nomem_threshold = spdk_max((int64_t)shared_resource->io_outstanding / 2,
2805 							   (int64_t)shared_resource->io_outstanding - NOMEM_THRESHOLD_COUNT);
2806 			return;
2807 		}
2808 
2809 		if (spdk_unlikely(!TAILQ_EMPTY(&shared_resource->nomem_io))) {
2810 			_spdk_bdev_ch_retry_io(bdev_ch);
2811 		}
2812 	}
2813 
2814 	_spdk_bdev_io_complete(bdev_io);
2815 }
2816 
2817 void
2818 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc,
2819 				  enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq)
2820 {
2821 	if (sc == SPDK_SCSI_STATUS_GOOD) {
2822 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS;
2823 	} else {
2824 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SCSI_ERROR;
2825 		bdev_io->internal.error.scsi.sc = sc;
2826 		bdev_io->internal.error.scsi.sk = sk;
2827 		bdev_io->internal.error.scsi.asc = asc;
2828 		bdev_io->internal.error.scsi.ascq = ascq;
2829 	}
2830 
2831 	spdk_bdev_io_complete(bdev_io, bdev_io->internal.status);
2832 }
2833 
2834 void
2835 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io,
2836 			     int *sc, int *sk, int *asc, int *ascq)
2837 {
2838 	assert(sc != NULL);
2839 	assert(sk != NULL);
2840 	assert(asc != NULL);
2841 	assert(ascq != NULL);
2842 
2843 	switch (bdev_io->internal.status) {
2844 	case SPDK_BDEV_IO_STATUS_SUCCESS:
2845 		*sc = SPDK_SCSI_STATUS_GOOD;
2846 		*sk = SPDK_SCSI_SENSE_NO_SENSE;
2847 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
2848 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
2849 		break;
2850 	case SPDK_BDEV_IO_STATUS_NVME_ERROR:
2851 		spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq);
2852 		break;
2853 	case SPDK_BDEV_IO_STATUS_SCSI_ERROR:
2854 		*sc = bdev_io->internal.error.scsi.sc;
2855 		*sk = bdev_io->internal.error.scsi.sk;
2856 		*asc = bdev_io->internal.error.scsi.asc;
2857 		*ascq = bdev_io->internal.error.scsi.ascq;
2858 		break;
2859 	default:
2860 		*sc = SPDK_SCSI_STATUS_CHECK_CONDITION;
2861 		*sk = SPDK_SCSI_SENSE_ABORTED_COMMAND;
2862 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
2863 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
2864 		break;
2865 	}
2866 }
2867 
2868 void
2869 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc)
2870 {
2871 	if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) {
2872 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS;
2873 	} else {
2874 		bdev_io->internal.error.nvme.sct = sct;
2875 		bdev_io->internal.error.nvme.sc = sc;
2876 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NVME_ERROR;
2877 	}
2878 
2879 	spdk_bdev_io_complete(bdev_io, bdev_io->internal.status);
2880 }
2881 
2882 void
2883 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc)
2884 {
2885 	assert(sct != NULL);
2886 	assert(sc != NULL);
2887 
2888 	if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) {
2889 		*sct = bdev_io->internal.error.nvme.sct;
2890 		*sc = bdev_io->internal.error.nvme.sc;
2891 	} else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) {
2892 		*sct = SPDK_NVME_SCT_GENERIC;
2893 		*sc = SPDK_NVME_SC_SUCCESS;
2894 	} else {
2895 		*sct = SPDK_NVME_SCT_GENERIC;
2896 		*sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
2897 	}
2898 }
2899 
2900 struct spdk_thread *
2901 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io)
2902 {
2903 	return spdk_io_channel_get_thread(bdev_io->internal.ch->channel);
2904 }
2905 
2906 static void
2907 _spdk_bdev_qos_config_type(struct spdk_bdev *bdev, uint64_t qos_set,
2908 			   enum spdk_bdev_qos_type qos_type)
2909 {
2910 	uint64_t	min_qos_set = 0;
2911 
2912 	switch (qos_type) {
2913 	case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT:
2914 		min_qos_set = SPDK_BDEV_QOS_MIN_IOS_PER_SEC;
2915 		break;
2916 	case SPDK_BDEV_QOS_RW_BYTEPS_RATE_LIMIT:
2917 		min_qos_set = SPDK_BDEV_QOS_MIN_BW_IN_MB_PER_SEC;
2918 		break;
2919 	default:
2920 		SPDK_ERRLOG("Unsupported QoS type.\n");
2921 		return;
2922 	}
2923 
2924 	if (qos_set % min_qos_set) {
2925 		SPDK_ERRLOG("Assigned QoS %" PRIu64 " on bdev %s is not multiple of %lu\n",
2926 			    qos_set, bdev->name, min_qos_set);
2927 		SPDK_ERRLOG("Failed to enable QoS on this bdev %s\n", bdev->name);
2928 		return;
2929 	}
2930 
2931 	if (!bdev->internal.qos) {
2932 		bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos));
2933 		if (!bdev->internal.qos) {
2934 			SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n");
2935 			return;
2936 		}
2937 	}
2938 
2939 	switch (qos_type) {
2940 	case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT:
2941 		bdev->internal.qos->iops_rate_limit = qos_set;
2942 		break;
2943 	case SPDK_BDEV_QOS_RW_BYTEPS_RATE_LIMIT:
2944 		bdev->internal.qos->byte_rate_limit = qos_set * 1024 * 1024;
2945 		break;
2946 	default:
2947 		break;
2948 	}
2949 
2950 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev:%s QoS type:%d set:%lu\n",
2951 		      bdev->name, qos_type, qos_set);
2952 
2953 	return;
2954 }
2955 
2956 static void
2957 _spdk_bdev_qos_config(struct spdk_bdev *bdev)
2958 {
2959 	struct spdk_conf_section	*sp = NULL;
2960 	const char			*val = NULL;
2961 	uint64_t			qos_set = 0;
2962 	int				i = 0, j = 0;
2963 
2964 	sp = spdk_conf_find_section(NULL, "QoS");
2965 	if (!sp) {
2966 		return;
2967 	}
2968 
2969 	while (j < SPDK_BDEV_QOS_NUM_TYPES) {
2970 		i = 0;
2971 		while (true) {
2972 			val = spdk_conf_section_get_nmval(sp, qos_type_str[j], i, 0);
2973 			if (!val) {
2974 				break;
2975 			}
2976 
2977 			if (strcmp(bdev->name, val) != 0) {
2978 				i++;
2979 				continue;
2980 			}
2981 
2982 			val = spdk_conf_section_get_nmval(sp, qos_type_str[j], i, 1);
2983 			if (val) {
2984 				qos_set = strtoull(val, NULL, 10);
2985 				_spdk_bdev_qos_config_type(bdev, qos_set, j);
2986 			}
2987 
2988 			break;
2989 		}
2990 
2991 		j++;
2992 	}
2993 
2994 	return;
2995 }
2996 
2997 static int
2998 spdk_bdev_init(struct spdk_bdev *bdev)
2999 {
3000 	assert(bdev->module != NULL);
3001 
3002 	if (!bdev->name) {
3003 		SPDK_ERRLOG("Bdev name is NULL\n");
3004 		return -EINVAL;
3005 	}
3006 
3007 	if (spdk_bdev_get_by_name(bdev->name)) {
3008 		SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name);
3009 		return -EEXIST;
3010 	}
3011 
3012 	bdev->internal.status = SPDK_BDEV_STATUS_READY;
3013 	bdev->internal.measured_queue_depth = UINT64_MAX;
3014 
3015 	TAILQ_INIT(&bdev->internal.open_descs);
3016 
3017 	TAILQ_INIT(&bdev->aliases);
3018 
3019 	bdev->internal.reset_in_progress = NULL;
3020 
3021 	_spdk_bdev_qos_config(bdev);
3022 
3023 	spdk_io_device_register(__bdev_to_io_dev(bdev),
3024 				spdk_bdev_channel_create, spdk_bdev_channel_destroy,
3025 				sizeof(struct spdk_bdev_channel));
3026 
3027 	pthread_mutex_init(&bdev->internal.mutex, NULL);
3028 	return 0;
3029 }
3030 
3031 static void
3032 spdk_bdev_destroy_cb(void *io_device)
3033 {
3034 	int			rc;
3035 	struct spdk_bdev	*bdev;
3036 	spdk_bdev_unregister_cb	cb_fn;
3037 	void			*cb_arg;
3038 
3039 	bdev = __bdev_from_io_dev(io_device);
3040 	cb_fn = bdev->internal.unregister_cb;
3041 	cb_arg = bdev->internal.unregister_ctx;
3042 
3043 	rc = bdev->fn_table->destruct(bdev->ctxt);
3044 	if (rc < 0) {
3045 		SPDK_ERRLOG("destruct failed\n");
3046 	}
3047 	if (rc <= 0 && cb_fn != NULL) {
3048 		cb_fn(cb_arg, rc);
3049 	}
3050 }
3051 
3052 
3053 static void
3054 spdk_bdev_fini(struct spdk_bdev *bdev)
3055 {
3056 	pthread_mutex_destroy(&bdev->internal.mutex);
3057 
3058 	free(bdev->internal.qos);
3059 
3060 	spdk_io_device_unregister(__bdev_to_io_dev(bdev), spdk_bdev_destroy_cb);
3061 }
3062 
3063 static void
3064 spdk_bdev_start(struct spdk_bdev *bdev)
3065 {
3066 	struct spdk_bdev_module *module;
3067 	uint32_t action;
3068 
3069 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name);
3070 	TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, internal.link);
3071 
3072 	/* Examine configuration before initializing I/O */
3073 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) {
3074 		if (module->examine_config) {
3075 			action = module->internal.action_in_progress;
3076 			module->internal.action_in_progress++;
3077 			module->examine_config(bdev);
3078 			if (action != module->internal.action_in_progress) {
3079 				SPDK_ERRLOG("examine_config for module %s did not call spdk_bdev_module_examine_done()\n",
3080 					    module->name);
3081 			}
3082 		}
3083 	}
3084 
3085 	if (bdev->internal.claim_module) {
3086 		return;
3087 	}
3088 
3089 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) {
3090 		if (module->examine_disk) {
3091 			module->internal.action_in_progress++;
3092 			module->examine_disk(bdev);
3093 		}
3094 	}
3095 }
3096 
3097 int
3098 spdk_bdev_register(struct spdk_bdev *bdev)
3099 {
3100 	int rc = spdk_bdev_init(bdev);
3101 
3102 	if (rc == 0) {
3103 		spdk_bdev_start(bdev);
3104 	}
3105 
3106 	return rc;
3107 }
3108 
3109 int
3110 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count)
3111 {
3112 	int rc;
3113 
3114 	rc = spdk_bdev_init(vbdev);
3115 	if (rc) {
3116 		return rc;
3117 	}
3118 
3119 	spdk_bdev_start(vbdev);
3120 	return 0;
3121 }
3122 
3123 void
3124 spdk_bdev_destruct_done(struct spdk_bdev *bdev, int bdeverrno)
3125 {
3126 	if (bdev->internal.unregister_cb != NULL) {
3127 		bdev->internal.unregister_cb(bdev->internal.unregister_ctx, bdeverrno);
3128 	}
3129 }
3130 
3131 static void
3132 _remove_notify(void *arg)
3133 {
3134 	struct spdk_bdev_desc *desc = arg;
3135 
3136 	desc->remove_cb(desc->remove_ctx);
3137 }
3138 
3139 void
3140 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg)
3141 {
3142 	struct spdk_bdev_desc	*desc, *tmp;
3143 	bool			do_destruct = true;
3144 	struct spdk_thread	*thread;
3145 
3146 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name);
3147 
3148 	thread = spdk_get_thread();
3149 	if (!thread) {
3150 		/* The user called this from a non-SPDK thread. */
3151 		if (cb_fn != NULL) {
3152 			cb_fn(cb_arg, -ENOTSUP);
3153 		}
3154 		return;
3155 	}
3156 
3157 	pthread_mutex_lock(&bdev->internal.mutex);
3158 
3159 	bdev->internal.status = SPDK_BDEV_STATUS_REMOVING;
3160 	bdev->internal.unregister_cb = cb_fn;
3161 	bdev->internal.unregister_ctx = cb_arg;
3162 
3163 	TAILQ_FOREACH_SAFE(desc, &bdev->internal.open_descs, link, tmp) {
3164 		if (desc->remove_cb) {
3165 			do_destruct = false;
3166 			/*
3167 			 * Defer invocation of the remove_cb to a separate message that will
3168 			 *  run later on this thread.  This ensures this context unwinds and
3169 			 *  we don't recursively unregister this bdev again if the remove_cb
3170 			 *  immediately closes its descriptor.
3171 			 */
3172 			if (!desc->remove_scheduled) {
3173 				/* Avoid scheduling removal of the same descriptor multiple times. */
3174 				desc->remove_scheduled = true;
3175 				spdk_thread_send_msg(thread, _remove_notify, desc);
3176 			}
3177 		}
3178 	}
3179 
3180 	if (!do_destruct) {
3181 		pthread_mutex_unlock(&bdev->internal.mutex);
3182 		return;
3183 	}
3184 
3185 	TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link);
3186 	pthread_mutex_unlock(&bdev->internal.mutex);
3187 
3188 	spdk_bdev_fini(bdev);
3189 }
3190 
3191 int
3192 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb,
3193 	       void *remove_ctx, struct spdk_bdev_desc **_desc)
3194 {
3195 	struct spdk_bdev_desc *desc;
3196 
3197 	desc = calloc(1, sizeof(*desc));
3198 	if (desc == NULL) {
3199 		SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n");
3200 		return -ENOMEM;
3201 	}
3202 
3203 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Opening descriptor %p for bdev %s on thread %p\n", desc, bdev->name,
3204 		      spdk_get_thread());
3205 
3206 	pthread_mutex_lock(&bdev->internal.mutex);
3207 
3208 	if (write && bdev->internal.claim_module) {
3209 		SPDK_ERRLOG("Could not open %s - %s module already claimed it\n",
3210 			    bdev->name, bdev->internal.claim_module->name);
3211 		free(desc);
3212 		pthread_mutex_unlock(&bdev->internal.mutex);
3213 		return -EPERM;
3214 	}
3215 
3216 	TAILQ_INSERT_TAIL(&bdev->internal.open_descs, desc, link);
3217 
3218 	desc->bdev = bdev;
3219 	desc->remove_cb = remove_cb;
3220 	desc->remove_ctx = remove_ctx;
3221 	desc->write = write;
3222 	*_desc = desc;
3223 
3224 	pthread_mutex_unlock(&bdev->internal.mutex);
3225 
3226 	return 0;
3227 }
3228 
3229 void
3230 spdk_bdev_close(struct spdk_bdev_desc *desc)
3231 {
3232 	struct spdk_bdev *bdev = desc->bdev;
3233 	bool do_unregister = false;
3234 
3235 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closing descriptor %p for bdev %s on thread %p\n", desc, bdev->name,
3236 		      spdk_get_thread());
3237 
3238 	pthread_mutex_lock(&bdev->internal.mutex);
3239 
3240 	TAILQ_REMOVE(&bdev->internal.open_descs, desc, link);
3241 	free(desc);
3242 
3243 	/* If no more descriptors, kill QoS channel */
3244 	if (bdev->internal.qos && TAILQ_EMPTY(&bdev->internal.open_descs)) {
3245 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closed last descriptor for bdev %s on thread %p. Stopping QoS.\n",
3246 			      bdev->name, spdk_get_thread());
3247 
3248 		if (spdk_bdev_qos_destroy(bdev)) {
3249 			/* There isn't anything we can do to recover here. Just let the
3250 			 * old QoS poller keep running. The QoS handling won't change
3251 			 * cores when the user allocates a new channel, but it won't break. */
3252 			SPDK_ERRLOG("Unable to shut down QoS poller. It will continue running on the current thread.\n");
3253 		}
3254 	}
3255 
3256 	spdk_bdev_set_qd_sampling_period(bdev, 0);
3257 
3258 	if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->internal.open_descs)) {
3259 		do_unregister = true;
3260 	}
3261 	pthread_mutex_unlock(&bdev->internal.mutex);
3262 
3263 	if (do_unregister == true) {
3264 		spdk_bdev_unregister(bdev, bdev->internal.unregister_cb, bdev->internal.unregister_ctx);
3265 	}
3266 }
3267 
3268 int
3269 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc,
3270 			    struct spdk_bdev_module *module)
3271 {
3272 	if (bdev->internal.claim_module != NULL) {
3273 		SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name,
3274 			    bdev->internal.claim_module->name);
3275 		return -EPERM;
3276 	}
3277 
3278 	if (desc && !desc->write) {
3279 		desc->write = true;
3280 	}
3281 
3282 	bdev->internal.claim_module = module;
3283 	return 0;
3284 }
3285 
3286 void
3287 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev)
3288 {
3289 	assert(bdev->internal.claim_module != NULL);
3290 	bdev->internal.claim_module = NULL;
3291 }
3292 
3293 struct spdk_bdev *
3294 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc)
3295 {
3296 	return desc->bdev;
3297 }
3298 
3299 void
3300 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp)
3301 {
3302 	struct iovec *iovs;
3303 	int iovcnt;
3304 
3305 	if (bdev_io == NULL) {
3306 		return;
3307 	}
3308 
3309 	switch (bdev_io->type) {
3310 	case SPDK_BDEV_IO_TYPE_READ:
3311 		iovs = bdev_io->u.bdev.iovs;
3312 		iovcnt = bdev_io->u.bdev.iovcnt;
3313 		break;
3314 	case SPDK_BDEV_IO_TYPE_WRITE:
3315 		iovs = bdev_io->u.bdev.iovs;
3316 		iovcnt = bdev_io->u.bdev.iovcnt;
3317 		break;
3318 	default:
3319 		iovs = NULL;
3320 		iovcnt = 0;
3321 		break;
3322 	}
3323 
3324 	if (iovp) {
3325 		*iovp = iovs;
3326 	}
3327 	if (iovcntp) {
3328 		*iovcntp = iovcnt;
3329 	}
3330 }
3331 
3332 void
3333 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module)
3334 {
3335 
3336 	if (spdk_bdev_module_list_find(bdev_module->name)) {
3337 		SPDK_ERRLOG("ERROR: module '%s' already registered.\n", bdev_module->name);
3338 		assert(false);
3339 	}
3340 
3341 	if (bdev_module->async_init) {
3342 		bdev_module->internal.action_in_progress = 1;
3343 	}
3344 
3345 	/*
3346 	 * Modules with examine callbacks must be initialized first, so they are
3347 	 *  ready to handle examine callbacks from later modules that will
3348 	 *  register physical bdevs.
3349 	 */
3350 	if (bdev_module->examine_config != NULL || bdev_module->examine_disk != NULL) {
3351 		TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq);
3352 	} else {
3353 		TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq);
3354 	}
3355 }
3356 
3357 struct spdk_bdev_module *
3358 spdk_bdev_module_list_find(const char *name)
3359 {
3360 	struct spdk_bdev_module *bdev_module;
3361 
3362 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) {
3363 		if (strcmp(name, bdev_module->name) == 0) {
3364 			break;
3365 		}
3366 	}
3367 
3368 	return bdev_module;
3369 }
3370 
3371 static void
3372 _spdk_bdev_write_zero_buffer_next(void *_bdev_io)
3373 {
3374 	struct spdk_bdev_io *bdev_io = _bdev_io;
3375 	uint64_t num_bytes, num_blocks;
3376 	int rc;
3377 
3378 	num_bytes = spdk_min(spdk_bdev_get_block_size(bdev_io->bdev) *
3379 			     bdev_io->u.bdev.split_remaining_num_blocks,
3380 			     ZERO_BUFFER_SIZE);
3381 	num_blocks = num_bytes / spdk_bdev_get_block_size(bdev_io->bdev);
3382 
3383 	rc = spdk_bdev_write_blocks(bdev_io->internal.desc,
3384 				    spdk_io_channel_from_ctx(bdev_io->internal.ch),
3385 				    g_bdev_mgr.zero_buffer,
3386 				    bdev_io->u.bdev.split_current_offset_blocks, num_blocks,
3387 				    _spdk_bdev_write_zero_buffer_done, bdev_io);
3388 	if (rc == 0) {
3389 		bdev_io->u.bdev.split_remaining_num_blocks -= num_blocks;
3390 		bdev_io->u.bdev.split_current_offset_blocks += num_blocks;
3391 	} else if (rc == -ENOMEM) {
3392 		bdev_io->internal.waitq_entry.bdev = bdev_io->bdev;
3393 		bdev_io->internal.waitq_entry.cb_fn = _spdk_bdev_write_zero_buffer_next;
3394 		bdev_io->internal.waitq_entry.cb_arg = bdev_io;
3395 		spdk_bdev_queue_io_wait(bdev_io->bdev, spdk_io_channel_from_ctx(bdev_io->internal.ch),
3396 					&bdev_io->internal.waitq_entry);
3397 	} else {
3398 		/* This should never happen. */
3399 		assert(false);
3400 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED;
3401 		bdev_io->internal.cb(bdev_io, SPDK_BDEV_IO_STATUS_FAILED, bdev_io->internal.caller_ctx);
3402 	}
3403 }
3404 
3405 static void
3406 _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
3407 {
3408 	struct spdk_bdev_io *parent_io = cb_arg;
3409 
3410 	if (!success) {
3411 		parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED;
3412 		parent_io->internal.cb(parent_io, SPDK_BDEV_IO_STATUS_FAILED, parent_io->internal.caller_ctx);
3413 		return;
3414 	}
3415 
3416 	if (parent_io->u.bdev.split_remaining_num_blocks == 0) {
3417 		parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS;
3418 		parent_io->internal.cb(parent_io, SPDK_BDEV_IO_STATUS_SUCCESS, parent_io->internal.caller_ctx);
3419 		return;
3420 	}
3421 
3422 	_spdk_bdev_write_zero_buffer_next(parent_io);
3423 }
3424 
3425 struct set_qos_limit_ctx {
3426 	void (*cb_fn)(void *cb_arg, int status);
3427 	void *cb_arg;
3428 	struct spdk_bdev *bdev;
3429 };
3430 
3431 static void
3432 _spdk_bdev_set_qos_limit_done(struct set_qos_limit_ctx *ctx, int status)
3433 {
3434 	pthread_mutex_lock(&ctx->bdev->internal.mutex);
3435 	ctx->bdev->internal.qos_mod_in_progress = false;
3436 	pthread_mutex_unlock(&ctx->bdev->internal.mutex);
3437 
3438 	ctx->cb_fn(ctx->cb_arg, status);
3439 	free(ctx);
3440 }
3441 
3442 static void
3443 _spdk_bdev_disable_qos_done(void *cb_arg)
3444 {
3445 	struct set_qos_limit_ctx *ctx = cb_arg;
3446 	struct spdk_bdev *bdev = ctx->bdev;
3447 	struct spdk_bdev_io *bdev_io;
3448 	struct spdk_bdev_qos *qos;
3449 
3450 	pthread_mutex_lock(&bdev->internal.mutex);
3451 	qos = bdev->internal.qos;
3452 	bdev->internal.qos = NULL;
3453 	pthread_mutex_unlock(&bdev->internal.mutex);
3454 
3455 	while (!TAILQ_EMPTY(&qos->queued)) {
3456 		/* Send queued I/O back to their original thread for resubmission. */
3457 		bdev_io = TAILQ_FIRST(&qos->queued);
3458 		TAILQ_REMOVE(&qos->queued, bdev_io, internal.link);
3459 
3460 		if (bdev_io->internal.io_submit_ch) {
3461 			/*
3462 			 * Channel was changed when sending it to the QoS thread - change it back
3463 			 *  before sending it back to the original thread.
3464 			 */
3465 			bdev_io->internal.ch = bdev_io->internal.io_submit_ch;
3466 			bdev_io->internal.io_submit_ch = NULL;
3467 		}
3468 
3469 		spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel),
3470 				     _spdk_bdev_io_submit, bdev_io);
3471 	}
3472 
3473 	spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch));
3474 	spdk_poller_unregister(&qos->poller);
3475 
3476 	free(qos);
3477 
3478 	_spdk_bdev_set_qos_limit_done(ctx, 0);
3479 }
3480 
3481 static void
3482 _spdk_bdev_disable_qos_msg_done(struct spdk_io_channel_iter *i, int status)
3483 {
3484 	void *io_device = spdk_io_channel_iter_get_io_device(i);
3485 	struct spdk_bdev *bdev = __bdev_from_io_dev(io_device);
3486 	struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
3487 	struct spdk_thread *thread;
3488 
3489 	pthread_mutex_lock(&bdev->internal.mutex);
3490 	thread = bdev->internal.qos->thread;
3491 	pthread_mutex_unlock(&bdev->internal.mutex);
3492 
3493 	spdk_thread_send_msg(thread, _spdk_bdev_disable_qos_done, ctx);
3494 }
3495 
3496 static void
3497 _spdk_bdev_disable_qos_msg(struct spdk_io_channel_iter *i)
3498 {
3499 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
3500 	struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch);
3501 
3502 	bdev_ch->flags &= ~BDEV_CH_QOS_ENABLED;
3503 
3504 	spdk_for_each_channel_continue(i, 0);
3505 }
3506 
3507 static void
3508 _spdk_bdev_update_qos_limit_iops_msg(void *cb_arg)
3509 {
3510 	struct set_qos_limit_ctx *ctx = cb_arg;
3511 	struct spdk_bdev *bdev = ctx->bdev;
3512 
3513 	pthread_mutex_lock(&bdev->internal.mutex);
3514 	spdk_bdev_qos_update_max_quota_per_timeslice(bdev->internal.qos);
3515 	pthread_mutex_unlock(&bdev->internal.mutex);
3516 
3517 	_spdk_bdev_set_qos_limit_done(ctx, 0);
3518 }
3519 
3520 static void
3521 _spdk_bdev_enable_qos_msg(struct spdk_io_channel_iter *i)
3522 {
3523 	void *io_device = spdk_io_channel_iter_get_io_device(i);
3524 	struct spdk_bdev *bdev = __bdev_from_io_dev(io_device);
3525 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
3526 	struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch);
3527 
3528 	pthread_mutex_lock(&bdev->internal.mutex);
3529 	_spdk_bdev_enable_qos(bdev, bdev_ch);
3530 	pthread_mutex_unlock(&bdev->internal.mutex);
3531 	spdk_for_each_channel_continue(i, 0);
3532 }
3533 
3534 static void
3535 _spdk_bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status)
3536 {
3537 	struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
3538 
3539 	_spdk_bdev_set_qos_limit_done(ctx, status);
3540 }
3541 
3542 void
3543 spdk_bdev_set_qos_limit_iops(struct spdk_bdev *bdev, uint64_t ios_per_sec,
3544 			     void (*cb_fn)(void *cb_arg, int status), void *cb_arg)
3545 {
3546 	struct set_qos_limit_ctx *ctx;
3547 
3548 	if (ios_per_sec > 0 && ios_per_sec % SPDK_BDEV_QOS_MIN_IOS_PER_SEC) {
3549 		SPDK_ERRLOG("Requested ios_per_sec limit %" PRIu64 " is not a multiple of %u\n",
3550 			    ios_per_sec, SPDK_BDEV_QOS_MIN_IOS_PER_SEC);
3551 		cb_fn(cb_arg, -EINVAL);
3552 		return;
3553 	}
3554 
3555 	ctx = calloc(1, sizeof(*ctx));
3556 	if (ctx == NULL) {
3557 		cb_fn(cb_arg, -ENOMEM);
3558 		return;
3559 	}
3560 
3561 	ctx->cb_fn = cb_fn;
3562 	ctx->cb_arg = cb_arg;
3563 	ctx->bdev = bdev;
3564 
3565 	pthread_mutex_lock(&bdev->internal.mutex);
3566 	if (bdev->internal.qos_mod_in_progress) {
3567 		pthread_mutex_unlock(&bdev->internal.mutex);
3568 		free(ctx);
3569 		cb_fn(cb_arg, -EAGAIN);
3570 		return;
3571 	}
3572 	bdev->internal.qos_mod_in_progress = true;
3573 
3574 	if (ios_per_sec > 0) {
3575 		if (bdev->internal.qos == NULL) {
3576 			/* Enabling */
3577 			bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos));
3578 			if (!bdev->internal.qos) {
3579 				pthread_mutex_unlock(&bdev->internal.mutex);
3580 				SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n");
3581 				free(ctx);
3582 				cb_fn(cb_arg, -ENOMEM);
3583 				return;
3584 			}
3585 
3586 			bdev->internal.qos->iops_rate_limit = ios_per_sec;
3587 			spdk_for_each_channel(__bdev_to_io_dev(bdev),
3588 					      _spdk_bdev_enable_qos_msg, ctx,
3589 					      _spdk_bdev_enable_qos_done);
3590 		} else {
3591 			/* Updating */
3592 			bdev->internal.qos->iops_rate_limit = ios_per_sec;
3593 			spdk_thread_send_msg(bdev->internal.qos->thread, _spdk_bdev_update_qos_limit_iops_msg, ctx);
3594 		}
3595 	} else {
3596 		if (bdev->internal.qos != NULL) {
3597 			/* Disabling */
3598 			spdk_for_each_channel(__bdev_to_io_dev(bdev),
3599 					      _spdk_bdev_disable_qos_msg, ctx,
3600 					      _spdk_bdev_disable_qos_msg_done);
3601 		} else {
3602 			pthread_mutex_unlock(&bdev->internal.mutex);
3603 			_spdk_bdev_set_qos_limit_done(ctx, 0);
3604 			return;
3605 		}
3606 	}
3607 
3608 	pthread_mutex_unlock(&bdev->internal.mutex);
3609 }
3610 
3611 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV)
3612